http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b89b472d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
new file mode 100644
index 0000000..19470db
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
@@ -0,0 +1,776 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.transactions.*;
+import org.gridgain.grid.kernal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.transactions.*;
+import org.apache.ignite.internal.util.direct.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.nio.*;
+import java.util.*;
+
+/**
+ * Transaction prepare request for optimistic and eventually consistent
+ * transactions.
+ */
+public class GridDistributedTxPrepareRequest<K, V> extends 
GridDistributedBaseMessage<K, V> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Thread ID. */
+    @GridToStringInclude
+    private long threadId;
+
+    /** Transaction concurrency. */
+    @GridToStringInclude
+    private IgniteTxConcurrency concurrency;
+
+    /** Transaction isolation. */
+    @GridToStringInclude
+    private IgniteTxIsolation isolation;
+
+    /** Commit version for EC transactions. */
+    @GridToStringInclude
+    private GridCacheVersion commitVer;
+
+    /** Transaction timeout. */
+    @GridToStringInclude
+    private long timeout;
+
+    /** Invalidation flag. */
+    @GridToStringInclude
+    private boolean invalidate;
+
+    /** Transaction read set. */
+    @GridToStringInclude
+    @GridDirectTransient
+    private Collection<IgniteTxEntry<K, V>> reads;
+
+    /** */
+    @GridDirectCollection(byte[].class)
+    private Collection<byte[]> readsBytes;
+
+    /** Transaction write entries. */
+    @GridToStringInclude
+    @GridDirectTransient
+    private Collection<IgniteTxEntry<K, V>> writes;
+
+    /** */
+    @GridDirectCollection(byte[].class)
+    private Collection<byte[]> writesBytes;
+
+    /** DHT versions to verify. */
+    @GridToStringInclude
+    @GridDirectTransient
+    private Map<IgniteTxKey<K>, GridCacheVersion> dhtVers;
+
+    /** Serialized map. */
+    @GridToStringExclude
+    private byte[] dhtVersBytes;
+
+    /** Group lock key, if any. */
+    @GridToStringInclude
+    @GridDirectTransient
+    private IgniteTxKey grpLockKey;
+
+    /** Group lock key bytes. */
+    @GridToStringExclude
+    private byte[] grpLockKeyBytes;
+
+    /** Partition lock flag. */
+    private boolean partLock;
+
+    /** Expected transaction size. */
+    private int txSize;
+
+    /** Transaction nodes mapping (primary node -> related backup nodes). */
+    @GridDirectTransient
+    private Map<UUID, Collection<UUID>> txNodes;
+
+    /** */
+    private byte[] txNodesBytes;
+
+    /** System flag. */
+    private boolean sys;
+
+    /**
+     * Required by {@link Externalizable}.
+     */
+    public GridDistributedTxPrepareRequest() {
+        /* No-op. */
+    }
+
+    /**
+     * @param tx Cache transaction.
+     * @param reads Read entries.
+     * @param writes Write entries.
+     * @param grpLockKey Group lock key.
+     * @param partLock {@code True} if preparing group-lock transaction with 
partition lock.
+     * @param txNodes Transaction nodes mapping.
+     */
+    public GridDistributedTxPrepareRequest(
+        IgniteTxEx<K, V> tx,
+        @Nullable Collection<IgniteTxEntry<K, V>> reads,
+        Collection<IgniteTxEntry<K, V>> writes,
+        IgniteTxKey grpLockKey,
+        boolean partLock,
+        Map<UUID, Collection<UUID>> txNodes
+    ) {
+        super(tx.xidVersion(), 0);
+
+        commitVer = null;
+        threadId = tx.threadId();
+        concurrency = tx.concurrency();
+        isolation = tx.isolation();
+        timeout = tx.timeout();
+        invalidate = tx.isInvalidate();
+        txSize = tx.size();
+        sys = tx.system();
+
+        this.reads = reads;
+        this.writes = writes;
+        this.grpLockKey = grpLockKey;
+        this.partLock = partLock;
+        this.txNodes = txNodes;
+    }
+
+    /**
+     * @return Transaction nodes mapping.
+     */
+    public Map<UUID, Collection<UUID>> transactionNodes() {
+        return txNodes;
+    }
+
+    /**
+     * @return System flag.
+     */
+    public boolean system() {
+        return sys;
+    }
+
+    /**
+     * Adds version to be verified on remote node.
+     *
+     * @param key Key for which version is verified.
+     * @param dhtVer DHT version to check.
+     */
+    public void addDhtVersion(IgniteTxKey<K> key, @Nullable GridCacheVersion 
dhtVer) {
+        if (dhtVers == null)
+            dhtVers = new HashMap<>();
+
+        dhtVers.put(key, dhtVer);
+    }
+
+    /**
+     * @return Map of versions to be verified.
+     */
+    public Map<IgniteTxKey<K>, GridCacheVersion> dhtVersions() {
+        return dhtVers == null ? Collections.<IgniteTxKey<K>, 
GridCacheVersion>emptyMap() : dhtVers;
+    }
+
+    /**
+     * @return Thread ID.
+     */
+    public long threadId() {
+        return threadId;
+    }
+
+    /**
+     * @return Commit version.
+     */
+    public GridCacheVersion commitVersion() { return commitVer; }
+
+    /**
+     * @return Invalidate flag.
+     */
+    public boolean isInvalidate() { return invalidate; }
+
+    /**
+     * @return Transaction timeout.
+     */
+    public long timeout() {
+        return timeout;
+    }
+
+    /**
+     * @return Concurrency.
+     */
+    public IgniteTxConcurrency concurrency() {
+        return concurrency;
+    }
+
+    /**
+     * @return Isolation level.
+     */
+    public IgniteTxIsolation isolation() {
+        return isolation;
+    }
+
+    /**
+     * @return Read set.
+     */
+    public Collection<IgniteTxEntry<K, V>> reads() {
+        return reads;
+    }
+
+    /**
+     * @return Write entries.
+     */
+    public Collection<IgniteTxEntry<K, V>> writes() {
+        return writes;
+    }
+
+    /**
+     * @param reads Reads.
+     */
+    protected void reads(Collection<IgniteTxEntry<K, V>> reads) {
+        this.reads = reads;
+    }
+
+    /**
+     * @param writes Writes.
+     */
+    protected void writes(Collection<IgniteTxEntry<K, V>> writes) {
+        this.writes = writes;
+    }
+
+    /**
+     * @return Group lock key if preparing group-lock transaction.
+     */
+    @Nullable public IgniteTxKey groupLockKey() {
+        return grpLockKey;
+    }
+
+    /**
+     * @return {@code True} if preparing group-lock transaction with partition 
lock.
+     */
+    public boolean partitionLock() {
+        return partLock;
+    }
+
+    /**
+     * @return Expected transaction size.
+     */
+    public int txSize() {
+        return txSize;
+    }
+
+    /** {@inheritDoc}
+     * @param ctx*/
+    @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) 
throws IgniteCheckedException {
+        super.prepareMarshal(ctx);
+
+        if (writes != null) {
+            marshalTx(writes, ctx);
+
+            writesBytes = new ArrayList<>(writes.size());
+
+            for (IgniteTxEntry<K, V> e : writes)
+                writesBytes.add(ctx.marshaller().marshal(e));
+        }
+
+        if (reads != null) {
+            marshalTx(reads, ctx);
+
+            readsBytes = new ArrayList<>(reads.size());
+
+            for (IgniteTxEntry<K, V> e : reads)
+                readsBytes.add(ctx.marshaller().marshal(e));
+        }
+
+        if (grpLockKey != null && grpLockKeyBytes == null)
+            grpLockKeyBytes = ctx.marshaller().marshal(grpLockKey);
+
+        if (dhtVers != null && dhtVersBytes == null)
+            dhtVersBytes = ctx.marshaller().marshal(dhtVers);
+
+        if (txNodes != null)
+            txNodesBytes = ctx.marshaller().marshal(txNodes);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void finishUnmarshal(GridCacheSharedContext<K, V> ctx, 
ClassLoader ldr) throws IgniteCheckedException {
+        super.finishUnmarshal(ctx, ldr);
+
+        if (writesBytes != null) {
+            writes = new ArrayList<>(writesBytes.size());
+
+            for (byte[] arr : writesBytes)
+                writes.add(ctx.marshaller().<IgniteTxEntry<K, 
V>>unmarshal(arr, ldr));
+
+            unmarshalTx(writes, false, ctx, ldr);
+        }
+
+        if (readsBytes != null) {
+            reads = new ArrayList<>(readsBytes.size());
+
+            for (byte[] arr : readsBytes)
+                reads.add(ctx.marshaller().<IgniteTxEntry<K, V>>unmarshal(arr, 
ldr));
+
+            unmarshalTx(reads, false, ctx, ldr);
+        }
+
+        if (grpLockKeyBytes != null && grpLockKey == null)
+            grpLockKey = ctx.marshaller().unmarshal(grpLockKeyBytes, ldr);
+
+        if (dhtVersBytes != null && dhtVers == null)
+            dhtVers = ctx.marshaller().unmarshal(dhtVersBytes, ldr);
+
+        if (txNodesBytes != null)
+            txNodes = ctx.marshaller().unmarshal(txNodesBytes, ldr);
+    }
+
+    /**
+     *
+     * @param out Output.
+     * @param col Set to write.
+     * @throws IOException If write failed.
+     */
+    private void writeCollection(ObjectOutput out, Collection<IgniteTxEntry<K, 
V>> col) throws IOException {
+        boolean empty = F.isEmpty(col);
+
+        if (!empty) {
+            out.writeInt(col.size());
+
+            for (IgniteTxEntry<K, V> e : col) {
+                V val = e.value();
+                boolean hasWriteVal = e.hasWriteValue();
+                boolean hasReadVal = e.hasReadValue();
+
+                try {
+                    // Don't serialize value if invalidate is set to true.
+                    if (invalidate)
+                        e.value(null, false, false);
+
+                    out.writeObject(e);
+                }
+                finally {
+                    // Set original value back.
+                    e.value(val, hasWriteVal, hasReadVal);
+                }
+            }
+        }
+        else
+            out.writeInt(-1);
+    }
+
+    /**
+     * @param in Input.
+     * @return Deserialized set.
+     * @throws IOException If deserialization failed.
+     * @throws ClassNotFoundException If deserialized class could not be found.
+     */
+    @SuppressWarnings({"unchecked"})
+    @Nullable private Collection<IgniteTxEntry<K, V>> 
readCollection(ObjectInput in) throws IOException,
+        ClassNotFoundException {
+        List<IgniteTxEntry<K, V>> col = null;
+
+        int size = in.readInt();
+
+        // Check null flag.
+        if (size != -1) {
+            col = new ArrayList<>(size);
+
+            for (int i = 0; i < size; i++)
+                col.add((IgniteTxEntry<K, V>)in.readObject());
+        }
+
+        return col == null ? Collections.<IgniteTxEntry<K,V>>emptyList() : col;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors",
+        "OverriddenMethodCallDuringObjectConstruction"})
+    @Override public GridTcpCommunicationMessageAdapter clone() {
+        GridDistributedTxPrepareRequest _clone = new 
GridDistributedTxPrepareRequest();
+
+        clone0(_clone);
+
+        return _clone;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
+        super.clone0(_msg);
+
+        GridDistributedTxPrepareRequest _clone = 
(GridDistributedTxPrepareRequest)_msg;
+
+        _clone.threadId = threadId;
+        _clone.concurrency = concurrency;
+        _clone.isolation = isolation;
+        _clone.commitVer = commitVer;
+        _clone.timeout = timeout;
+        _clone.invalidate = invalidate;
+        _clone.reads = reads;
+        _clone.readsBytes = readsBytes;
+        _clone.writes = writes;
+        _clone.writesBytes = writesBytes;
+        _clone.dhtVers = dhtVers;
+        _clone.dhtVersBytes = dhtVersBytes;
+        _clone.grpLockKey = grpLockKey;
+        _clone.grpLockKeyBytes = grpLockKeyBytes;
+        _clone.partLock = partLock;
+        _clone.txSize = txSize;
+        _clone.txNodes = txNodes;
+        _clone.txNodesBytes = txNodesBytes;
+        _clone.sys = sys;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("all")
+    @Override public boolean writeTo(ByteBuffer buf) {
+        commState.setBuffer(buf);
+
+        if (!super.writeTo(buf))
+            return false;
+
+        if (!commState.typeWritten) {
+            if (!commState.putByte(directType()))
+                return false;
+
+            commState.typeWritten = true;
+        }
+
+        switch (commState.idx) {
+            case 8:
+                if (!commState.putCacheVersion(commitVer))
+                    return false;
+
+                commState.idx++;
+
+            case 9:
+                if (!commState.putEnum(concurrency))
+                    return false;
+
+                commState.idx++;
+
+            case 10:
+                if (!commState.putByteArray(dhtVersBytes))
+                    return false;
+
+                commState.idx++;
+
+            case 11:
+                if (!commState.putByteArray(grpLockKeyBytes))
+                    return false;
+
+                commState.idx++;
+
+            case 12:
+                if (!commState.putBoolean(invalidate))
+                    return false;
+
+                commState.idx++;
+
+            case 13:
+                if (!commState.putEnum(isolation))
+                    return false;
+
+                commState.idx++;
+
+            case 14:
+                if (!commState.putBoolean(partLock))
+                    return false;
+
+                commState.idx++;
+
+            case 15:
+                if (readsBytes != null) {
+                    if (commState.it == null) {
+                        if (!commState.putInt(readsBytes.size()))
+                            return false;
+
+                        commState.it = readsBytes.iterator();
+                    }
+
+                    while (commState.it.hasNext() || commState.cur != NULL) {
+                        if (commState.cur == NULL)
+                            commState.cur = commState.it.next();
+
+                        if (!commState.putByteArray((byte[])commState.cur))
+                            return false;
+
+                        commState.cur = NULL;
+                    }
+
+                    commState.it = null;
+                } else {
+                    if (!commState.putInt(-1))
+                        return false;
+                }
+
+                commState.idx++;
+
+            case 16:
+                if (!commState.putLong(threadId))
+                    return false;
+
+                commState.idx++;
+
+            case 17:
+                if (!commState.putLong(timeout))
+                    return false;
+
+                commState.idx++;
+
+            case 18:
+                if (!commState.putByteArray(txNodesBytes))
+                    return false;
+
+                commState.idx++;
+
+            case 19:
+                if (!commState.putInt(txSize))
+                    return false;
+
+                commState.idx++;
+
+            case 20:
+                if (writesBytes != null) {
+                    if (commState.it == null) {
+                        if (!commState.putInt(writesBytes.size()))
+                            return false;
+
+                        commState.it = writesBytes.iterator();
+                    }
+
+                    while (commState.it.hasNext() || commState.cur != NULL) {
+                        if (commState.cur == NULL)
+                            commState.cur = commState.it.next();
+
+                        if (!commState.putByteArray((byte[])commState.cur))
+                            return false;
+
+                        commState.cur = NULL;
+                    }
+
+                    commState.it = null;
+                } else {
+                    if (!commState.putInt(-1))
+                        return false;
+                }
+
+                commState.idx++;
+
+            case 21:
+                if (!commState.putBoolean(sys))
+                    return false;
+
+                commState.idx++;
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("all")
+    @Override public boolean readFrom(ByteBuffer buf) {
+        commState.setBuffer(buf);
+
+        if (!super.readFrom(buf))
+            return false;
+
+        switch (commState.idx) {
+            case 8:
+                GridCacheVersion commitVer0 = commState.getCacheVersion();
+
+                if (commitVer0 == CACHE_VER_NOT_READ)
+                    return false;
+
+                commitVer = commitVer0;
+
+                commState.idx++;
+
+            case 9:
+                if (buf.remaining() < 1)
+                    return false;
+
+                byte concurrency0 = commState.getByte();
+
+                concurrency = IgniteTxConcurrency.fromOrdinal(concurrency0);
+
+                commState.idx++;
+
+            case 10:
+                byte[] dhtVersBytes0 = commState.getByteArray();
+
+                if (dhtVersBytes0 == BYTE_ARR_NOT_READ)
+                    return false;
+
+                dhtVersBytes = dhtVersBytes0;
+
+                commState.idx++;
+
+            case 11:
+                byte[] grpLockKeyBytes0 = commState.getByteArray();
+
+                if (grpLockKeyBytes0 == BYTE_ARR_NOT_READ)
+                    return false;
+
+                grpLockKeyBytes = grpLockKeyBytes0;
+
+                commState.idx++;
+
+            case 12:
+                if (buf.remaining() < 1)
+                    return false;
+
+                invalidate = commState.getBoolean();
+
+                commState.idx++;
+
+            case 13:
+                if (buf.remaining() < 1)
+                    return false;
+
+                byte isolation0 = commState.getByte();
+
+                isolation = IgniteTxIsolation.fromOrdinal(isolation0);
+
+                commState.idx++;
+
+            case 14:
+                if (buf.remaining() < 1)
+                    return false;
+
+                partLock = commState.getBoolean();
+
+                commState.idx++;
+
+            case 15:
+                if (commState.readSize == -1) {
+                    if (buf.remaining() < 4)
+                        return false;
+
+                    commState.readSize = commState.getInt();
+                }
+
+                if (commState.readSize >= 0) {
+                    if (readsBytes == null)
+                        readsBytes = new ArrayList<>(commState.readSize);
+
+                    for (int i = commState.readItems; i < commState.readSize; 
i++) {
+                        byte[] _val = commState.getByteArray();
+
+                        if (_val == BYTE_ARR_NOT_READ)
+                            return false;
+
+                        readsBytes.add((byte[])_val);
+
+                        commState.readItems++;
+                    }
+                }
+
+                commState.readSize = -1;
+                commState.readItems = 0;
+
+                commState.idx++;
+
+            case 16:
+                if (buf.remaining() < 8)
+                    return false;
+
+                threadId = commState.getLong();
+
+                commState.idx++;
+
+            case 17:
+                if (buf.remaining() < 8)
+                    return false;
+
+                timeout = commState.getLong();
+
+                commState.idx++;
+
+            case 18:
+                byte[] txNodesBytes0 = commState.getByteArray();
+
+                if (txNodesBytes0 == BYTE_ARR_NOT_READ)
+                    return false;
+
+                txNodesBytes = txNodesBytes0;
+
+                commState.idx++;
+
+            case 19:
+                if (buf.remaining() < 4)
+                    return false;
+
+                txSize = commState.getInt();
+
+                commState.idx++;
+
+            case 20:
+                if (commState.readSize == -1) {
+                    if (buf.remaining() < 4)
+                        return false;
+
+                    commState.readSize = commState.getInt();
+                }
+
+                if (commState.readSize >= 0) {
+                    if (writesBytes == null)
+                        writesBytes = new ArrayList<>(commState.readSize);
+
+                    for (int i = commState.readItems; i < commState.readSize; 
i++) {
+                        byte[] _val = commState.getByteArray();
+
+                        if (_val == BYTE_ARR_NOT_READ)
+                            return false;
+
+                        writesBytes.add((byte[])_val);
+
+                        commState.readItems++;
+                    }
+                }
+
+                commState.readSize = -1;
+                commState.readItems = 0;
+
+                commState.idx++;
+
+            case 21:
+                if (buf.remaining() < 1)
+                    return false;
+
+                sys = commState.getBoolean();
+
+                commState.idx++;
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return 26;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return 
GridToStringBuilder.toString(GridDistributedTxPrepareRequest.class, this,
+            "super", super.toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b89b472d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
new file mode 100644
index 0000000..e809846
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
+import org.gridgain.grid.kernal.processors.cache.*;
+import org.apache.ignite.internal.util.direct.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.nio.*;
+import java.util.*;
+
+/**
+ * Response to prepare request.
+ */
+public class GridDistributedTxPrepareResponse<K, V> extends 
GridDistributedBaseMessage<K, V> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Collections of local lock candidates. */
+    @GridToStringInclude
+    @GridDirectTransient
+    private Map<K, Collection<GridCacheMvccCandidate<K>>> cands;
+
+    /** */
+    private byte[] candsBytes;
+
+    /** Error. */
+    @GridToStringExclude
+    @GridDirectTransient
+    private Throwable err;
+
+    /** Serialized error. */
+    private byte[] errBytes;
+
+    /**
+     * Empty constructor (required by {@link Externalizable}).
+     */
+    public GridDistributedTxPrepareResponse() {
+        /* No-op. */
+    }
+
+    /**
+     * @param xid Transaction ID.
+     */
+    public GridDistributedTxPrepareResponse(GridCacheVersion xid) {
+        super(xid, 0);
+    }
+
+    /**
+     * @param xid Lock ID.
+     * @param err Error.
+     */
+    public GridDistributedTxPrepareResponse(GridCacheVersion xid, Throwable 
err) {
+        super(xid, 0);
+
+        this.err = err;
+    }
+
+    /**
+     * @return Error.
+     */
+    public Throwable error() {
+        return err;
+    }
+
+    /**
+     * @param err Error to set.
+     */
+    public void error(Throwable err) {
+        this.err = err;
+    }
+
+    /**
+     * @return Rollback flag.
+     */
+    public boolean isRollback() {
+        return err != null;
+    }
+
+    /**
+     * @param cands Candidates map to set.
+     */
+    public void candidates(Map<K, Collection<GridCacheMvccCandidate<K>>> 
cands) {
+        this.cands = cands;
+    }
+
+    /** {@inheritDoc}
+     * @param ctx*/
+    @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) 
throws IgniteCheckedException {
+        super.prepareMarshal(ctx);
+
+        if (candsBytes == null && cands != null) {
+            if (ctx.deploymentEnabled()) {
+                for (K k : cands.keySet())
+                    prepareObject(k, ctx);
+            }
+
+            candsBytes = CU.marshal(ctx, cands);
+        }
+
+        if (err != null)
+            errBytes = ctx.marshaller().marshal(err);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void finishUnmarshal(GridCacheSharedContext<K, V> ctx, 
ClassLoader ldr) throws IgniteCheckedException {
+        super.finishUnmarshal(ctx, ldr);
+
+        if (candsBytes != null && cands == null)
+            cands = ctx.marshaller().unmarshal(candsBytes, ldr);
+
+        if (errBytes != null)
+            err = ctx.marshaller().unmarshal(errBytes, ldr);
+    }
+
+    /**
+     *
+     * @param key Candidates key.
+     * @return Collection of lock candidates at given index.
+     */
+    @Nullable public Collection<GridCacheMvccCandidate<K>> candidatesForKey(K 
key) {
+        assert key != null;
+
+        if (cands == null)
+            return null;
+
+        return cands.get(key);
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors",
+        "OverriddenMethodCallDuringObjectConstruction"})
+    @Override public GridTcpCommunicationMessageAdapter clone() {
+        GridDistributedTxPrepareResponse _clone = new 
GridDistributedTxPrepareResponse();
+
+        clone0(_clone);
+
+        return _clone;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
+        super.clone0(_msg);
+
+        GridDistributedTxPrepareResponse _clone = 
(GridDistributedTxPrepareResponse)_msg;
+
+        _clone.cands = cands;
+        _clone.candsBytes = candsBytes;
+        _clone.err = err;
+        _clone.errBytes = errBytes;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("all")
+    @Override public boolean writeTo(ByteBuffer buf) {
+        commState.setBuffer(buf);
+
+        if (!super.writeTo(buf))
+            return false;
+
+        if (!commState.typeWritten) {
+            if (!commState.putByte(directType()))
+                return false;
+
+            commState.typeWritten = true;
+        }
+
+        switch (commState.idx) {
+            case 8:
+                if (!commState.putByteArray(candsBytes))
+                    return false;
+
+                commState.idx++;
+
+            case 9:
+                if (!commState.putByteArray(errBytes))
+                    return false;
+
+                commState.idx++;
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("all")
+    @Override public boolean readFrom(ByteBuffer buf) {
+        commState.setBuffer(buf);
+
+        if (!super.readFrom(buf))
+            return false;
+
+        switch (commState.idx) {
+            case 8:
+                byte[] candsBytes0 = commState.getByteArray();
+
+                if (candsBytes0 == BYTE_ARR_NOT_READ)
+                    return false;
+
+                candsBytes = candsBytes0;
+
+                commState.idx++;
+
+            case 9:
+                byte[] errBytes0 = commState.getByteArray();
+
+                if (errBytes0 == BYTE_ARR_NOT_READ)
+                    return false;
+
+                errBytes = errBytes0;
+
+                commState.idx++;
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return 27;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return 
GridToStringBuilder.toString(GridDistributedTxPrepareResponse.class, this, 
"err",
+            err == null ? "null" : err.toString(), "super", super.toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b89b472d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
new file mode 100644
index 0000000..424f543
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -0,0 +1,775 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.transactions.*;
+import org.gridgain.grid.kernal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.distributed.near.*;
+import org.apache.ignite.internal.processors.cache.transactions.*;
+import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.transactions.IgniteTxState.*;
+import static org.gridgain.grid.kernal.processors.cache.GridCacheOperation.*;
+import static org.apache.ignite.internal.processors.dr.GridDrType.*;
+
+/**
+ * Transaction created by system implicitly on remote nodes.
+ */
+public class GridDistributedTxRemoteAdapter<K, V> extends IgniteTxAdapter<K, V>
+    implements IgniteTxRemoteEx<K, V> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Read set. */
+    @GridToStringInclude
+    protected Map<IgniteTxKey<K>, IgniteTxEntry<K, V>> readMap;
+
+    /** Write map. */
+    @GridToStringInclude
+    protected Map<IgniteTxKey<K>, IgniteTxEntry<K, V>> writeMap;
+
+    /** Remote thread ID. */
+    @GridToStringInclude
+    private long rmtThreadId;
+
+    /** Explicit versions. */
+    @GridToStringInclude
+    private List<GridCacheVersion> explicitVers;
+
+    /** Started flag. */
+    @GridToStringInclude
+    private boolean started;
+
+    /** {@code True} only if all write entries are locked by this transaction. 
*/
+    @GridToStringInclude
+    private AtomicBoolean commitAllowed = new AtomicBoolean(false);
+
+    /**
+     * Empty constructor required for {@link Externalizable}.
+     */
+    public GridDistributedTxRemoteAdapter() {
+        // No-op.
+    }
+
+    /**
+     * @param ctx Cache registry.
+     * @param nodeId Node ID.
+     * @param rmtThreadId Remote thread ID.
+     * @param xidVer XID version.
+     * @param commitVer Commit version.
+     * @param sys System flag.
+     * @param concurrency Concurrency level (should be pessimistic).
+     * @param isolation Transaction isolation.
+     * @param invalidate Invalidate flag.
+     * @param timeout Timeout.
+     * @param txSize Expected transaction size.
+     * @param grpLockKey Group lock key if this is a group-lock transaction.
+     * @param subjId Subject ID.
+     * @param taskNameHash Task name hash code.
+     */
+    public GridDistributedTxRemoteAdapter(
+        GridCacheSharedContext<K, V> ctx,
+        UUID nodeId,
+        long rmtThreadId,
+        GridCacheVersion xidVer,
+        GridCacheVersion commitVer,
+        boolean sys,
+        IgniteTxConcurrency concurrency,
+        IgniteTxIsolation isolation,
+        boolean invalidate,
+        long timeout,
+        int txSize,
+        @Nullable IgniteTxKey grpLockKey,
+        @Nullable UUID subjId,
+        int taskNameHash
+    ) {
+        super(
+            ctx,
+            nodeId,
+            xidVer,
+            ctx.versions().last(),
+            Thread.currentThread().getId(),
+            sys,
+            concurrency,
+            isolation,
+            timeout,
+            txSize,
+            grpLockKey,
+            subjId,
+            taskNameHash);
+
+        this.rmtThreadId = rmtThreadId;
+        this.invalidate = invalidate;
+
+        commitVersion(commitVer);
+
+        // Must set started flag after concurrency and isolation.
+        started = true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public UUID eventNodeId() {
+        return nodeId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<UUID> masterNodeIds() {
+        return Collections.singleton(nodeId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public UUID originatingNodeId() {
+        return nodeId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<Integer> activeCacheIds() {
+        return Collections.emptyList();
+    }
+
+    /**
+     * @return Checks if transaction has no entries.
+     */
+    @Override public boolean empty() {
+        return readMap.isEmpty() && writeMap.isEmpty();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean removed(IgniteTxKey<K> key) {
+        IgniteTxEntry e = writeMap.get(key);
+
+        return e != null && e.op() == DELETE;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void invalidate(boolean invalidate) {
+        this.invalidate = invalidate;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Map<IgniteTxKey<K>, IgniteTxEntry<K, V>> writeMap() {
+        return writeMap;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Map<IgniteTxKey<K>, IgniteTxEntry<K, V>> readMap() {
+        return readMap;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void seal() {
+        // No-op.
+    }
+
+    /**
+     * Adds group lock key to remote transaction.
+     *
+     * @param key Key.
+     */
+    public void groupLockKey(IgniteTxKey key) {
+        if (grpLockKey == null)
+            grpLockKey = key;
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridTuple<V> peek(GridCacheContext<K, V> cacheCtx, 
boolean failFast, K key,
+        IgnitePredicate<GridCacheEntry<K, V>>[] filter) throws 
GridCacheFilterFailedException {
+        assert false : "Method peek can only be called on user transaction: " 
+ this;
+
+        throw new IllegalStateException("Method peek can only be called on 
user transaction: " + this);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteTxEntry<K, V> entry(IgniteTxKey<K> key) {
+        IgniteTxEntry<K, V> e = writeMap == null ? null : writeMap.get(key);
+
+        if (e == null)
+            e = readMap == null ? null : readMap.get(key);
+
+        return e;
+    }
+
+    /**
+     * Clears entry from transaction as it never happened.
+     *
+     * @param key key to be removed.
+     */
+    public void clearEntry(IgniteTxKey<K> key) {
+        readMap.remove(key);
+        writeMap.remove(key);
+    }
+
+    /**
+     * @param baseVer Base version.
+     * @param committedVers Committed versions.
+     * @param rolledbackVers Rolled back versions.
+     */
+    @Override public void doneRemote(GridCacheVersion baseVer, 
Collection<GridCacheVersion> committedVers,
+        Collection<GridCacheVersion> rolledbackVers, 
Collection<GridCacheVersion> pendingVers) {
+        if (readMap != null && !readMap.isEmpty()) {
+            for (IgniteTxEntry<K, V> txEntry : readMap.values())
+                doneRemote(txEntry, baseVer, committedVers, rolledbackVers, 
pendingVers);
+        }
+
+        if (writeMap != null && !writeMap.isEmpty()) {
+            for (IgniteTxEntry<K, V> txEntry : writeMap.values())
+                doneRemote(txEntry, baseVer, committedVers, rolledbackVers, 
pendingVers);
+        }
+    }
+
+    /**
+     * Adds completed versions to an entry.
+     *
+     * @param txEntry Entry.
+     * @param baseVer Base version for completed versions.
+     * @param committedVers Completed versions relative to base version.
+     * @param rolledbackVers Rolled back versions relative to base version.
+     * @param pendingVers Pending versions.
+     */
+    private void doneRemote(IgniteTxEntry<K, V> txEntry, GridCacheVersion 
baseVer,
+        Collection<GridCacheVersion> committedVers, 
Collection<GridCacheVersion> rolledbackVers,
+        Collection<GridCacheVersion> pendingVers) {
+        while (true) {
+            GridDistributedCacheEntry<K, V> entry = 
(GridDistributedCacheEntry<K, V>)txEntry.cached();
+
+            try {
+                // Handle explicit locks.
+                GridCacheVersion doneVer = txEntry.explicitVersion() != null ? 
txEntry.explicitVersion() : xidVer;
+
+                entry.doneRemote(doneVer, baseVer, pendingVers, committedVers, 
rolledbackVers, isSystemInvalidate());
+
+                break;
+            }
+            catch (GridCacheEntryRemovedException ignored) {
+                assert entry.obsoleteVersion() != null;
+
+                if (log.isDebugEnabled())
+                    log.debug("Replacing obsolete entry in remote transaction 
[entry=" + entry + ", tx=" + this + ']');
+
+                // Replace the entry.
+                
txEntry.cached(txEntry.context().cache().entryEx(txEntry.key()), 
txEntry.keyBytes());
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean onOwnerChanged(GridCacheEntryEx<K, V> entry, 
GridCacheMvccCandidate<K> owner) {
+        try {
+            if (hasWriteKey(entry.txKey())) {
+                commitIfLocked();
+
+                return true;
+            }
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to commit remote transaction: " + this, e);
+        }
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isStarted() {
+        return started;
+    }
+
+    /**
+     * @return Remote node thread ID.
+     */
+    @Override public long remoteThreadId() {
+        return rmtThreadId;
+    }
+
+    /**
+     * @param e Transaction entry to set.
+     * @return {@code True} if value was set.
+     */
+    @Override public boolean setWriteValue(IgniteTxEntry<K, V> e) {
+        checkInternal(e.txKey());
+
+        IgniteTxEntry<K, V> entry = writeMap.get(e.txKey());
+
+        if (entry == null) {
+            IgniteTxEntry<K, V> rmv = readMap.remove(e.txKey());
+
+            if (rmv != null) {
+                e.cached(rmv.cached(), rmv.keyBytes());
+
+                writeMap.put(e.txKey(), e);
+            }
+            // If lock is explicit.
+            else {
+                e.cached(e.context().cache().entryEx(e.key()), null);
+
+                // explicit lock.
+                writeMap.put(e.txKey(), e);
+            }
+        }
+        else {
+            // Copy values.
+            entry.value(e.value(), e.hasWriteValue(), e.hasReadValue());
+            entry.entryProcessors(e.entryProcessors());
+            entry.valueBytes(e.valueBytes());
+            entry.op(e.op());
+            entry.ttl(e.ttl());
+            entry.explicitVersion(e.explicitVersion());
+            entry.groupLockEntry(e.groupLockEntry());
+
+            // DR stuff.
+            entry.drVersion(e.drVersion());
+            entry.drExpireTime(e.drExpireTime());
+        }
+
+        addExplicit(e);
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean hasWriteKey(IgniteTxKey<K> key) {
+        return writeMap.containsKey(key);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<IgniteTxEx<K, V>> prepareAsync() {
+        assert false;
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Set<IgniteTxKey<K>> readSet() {
+        return readMap.keySet();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Set<IgniteTxKey<K>> writeSet() {
+        return writeMap.keySet();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<IgniteTxEntry<K, V>> allEntries() {
+        return F.concat(false, writeEntries(), readEntries());
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<IgniteTxEntry<K, V>> writeEntries() {
+        return writeMap.values();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<IgniteTxEntry<K, V>> readEntries() {
+        return readMap.values();
+    }
+
+    /**
+     * Prepare phase.
+     *
+     * @throws IgniteCheckedException If prepare failed.
+     */
+    @Override public void prepare() throws IgniteCheckedException {
+        // If another thread is doing prepare or rollback.
+        if (!state(PREPARING)) {
+            // In optimistic mode prepare may be called multiple times.
+            if(state() != PREPARING || !optimistic()) {
+                if (log.isDebugEnabled())
+                    log.debug("Invalid transaction state for prepare: " + 
this);
+
+                return;
+            }
+        }
+
+        try {
+            cctx.tm().prepareTx(this);
+
+            if (pessimistic() || isSystemInvalidate())
+                state(PREPARED);
+        }
+        catch (IgniteCheckedException e) {
+            setRollbackOnly();
+
+            throw e;
+        }
+    }
+
+    /**
+     * @throws IgniteCheckedException If commit failed.
+     */
+    @SuppressWarnings({"CatchGenericClass"})
+    private void commitIfLocked() throws IgniteCheckedException {
+        if (state() == COMMITTING) {
+            for (IgniteTxEntry<K, V> txEntry : writeMap.values()) {
+                assert txEntry != null : "Missing transaction entry for tx: " 
+ this;
+
+                while (true) {
+                    GridCacheEntryEx<K, V> cacheEntry = txEntry.cached();
+
+                    assert cacheEntry != null : "Missing cached entry for 
transaction entry: " + txEntry;
+
+                    try {
+                        GridCacheVersion ver = txEntry.explicitVersion() != 
null ? txEntry.explicitVersion() : xidVer;
+
+                        // If locks haven't been acquired yet, keep waiting.
+                        if (!txEntry.groupLockEntry() && 
!cacheEntry.lockedBy(ver)) {
+                            if (log.isDebugEnabled())
+                                log.debug("Transaction does not own lock for 
entry (will wait) [entry=" + cacheEntry +
+                                    ", tx=" + this + ']');
+
+                            return;
+                        }
+
+                        break; // While.
+                    }
+                    catch (GridCacheEntryRemovedException ignore) {
+                        if (log.isDebugEnabled())
+                            log.debug("Got removed entry while committing 
(will retry): " + txEntry);
+
+                        
txEntry.cached(txEntry.context().cache().entryEx(txEntry.key()), 
txEntry.keyBytes());
+                    }
+                }
+            }
+
+            // Only one thread gets to commit.
+            if (commitAllowed.compareAndSet(false, true)) {
+                IgniteCheckedException err = null;
+
+                if (!F.isEmpty(writeMap)) {
+                    // Register this transaction as completed prior to 
write-phase to
+                    // ensure proper lock ordering for removed entries.
+                    cctx.tm().addCommittedTx(this);
+
+                    long topVer = topologyVersion();
+
+                    // Node that for near transactions we grab all entries.
+                    for (IgniteTxEntry<K, V> txEntry : (near() ? allEntries() 
: writeEntries())) {
+                        GridCacheContext<K, V> cacheCtx = txEntry.context();
+
+                        boolean replicate = cacheCtx.isDrEnabled();
+
+                        try {
+                            while (true) {
+                                try {
+                                    GridCacheEntryEx<K, V> cached = 
txEntry.cached();
+
+                                    if (cached == null)
+                                        txEntry.cached(cached = 
cacheCtx.cache().entryEx(txEntry.key()), null);
+
+                                    if (near() && 
cacheCtx.dr().receiveEnabled()) {
+                                        cached.markObsolete(xidVer);
+
+                                        break;
+                                    }
+
+                                    GridNearCacheEntry<K, V> nearCached = null;
+
+                                    if (updateNearCache(cacheCtx, 
txEntry.key(), topVer))
+                                        nearCached = 
cacheCtx.dht().near().peekExx(txEntry.key());
+
+                                    if (!F.isEmpty(txEntry.entryProcessors()) 
|| !F.isEmpty(txEntry.filters()))
+                                        txEntry.cached().unswap(true, false);
+
+                                    GridTuple3<GridCacheOperation, V, byte[]> 
res = applyTransformClosures(txEntry,
+                                        false);
+
+                                    GridCacheOperation op = res.get1();
+                                    V val = res.get2();
+                                    byte[] valBytes = res.get3();
+
+                                    GridCacheVersion explicitVer = 
txEntry.drVersion();
+
+                                    if (finalizationStatus() == 
FinalizationStatus.RECOVERY_FINISH || optimistic()) {
+                                        // Primary node has left the grid so 
we have to process conflicts on backups.
+                                        if (explicitVer == null)
+                                            explicitVer = writeVersion(); // 
Force write version to be used.
+
+                                        GridDrResolveResult<V> drRes = 
cacheCtx.dr().resolveTx(cached,
+                                            txEntry,
+                                            explicitVer,
+                                            op,
+                                            val,
+                                            valBytes,
+                                            txEntry.ttl(),
+                                            txEntry.drExpireTime());
+
+                                        if (drRes != null) {
+                                            op = drRes.operation();
+                                            val = drRes.value();
+                                            valBytes = drRes.valueBytes();
+
+                                            if (drRes.isMerge())
+                                                explicitVer = writeVersion();
+                                            else if (op == NOOP)
+                                                txEntry.ttl(-1L);
+                                        }
+                                        else
+                                            // Nullify explicit version so 
that innerSet/innerRemove will work as usual.
+                                            explicitVer = null;
+                                    }
+
+                                    if (op == CREATE || op == UPDATE) {
+                                        // Invalidate only for near nodes 
(backups cannot be invalidated).
+                                        if (isSystemInvalidate() || 
(isInvalidate() && cacheCtx.isNear()))
+                                            cached.innerRemove(this, 
eventNodeId(), nodeId, false, false, true, true,
+                                                topVer, txEntry.filters(), 
replicate ? DR_BACKUP : DR_NONE,
+                                                near() ? null : explicitVer, 
CU.subjectId(this, cctx),
+                                                resolveTaskName());
+                                        else {
+                                            cached.innerSet(this, 
eventNodeId(), nodeId, val, valBytes, false, false,
+                                                txEntry.ttl(), true, true, 
topVer, txEntry.filters(),
+                                                replicate ? DR_BACKUP : 
DR_NONE, txEntry.drExpireTime(),
+                                                near() ? null : explicitVer, 
CU.subjectId(this, cctx),
+                                                resolveTaskName());
+
+                                            // Keep near entry up to date.
+                                            if (nearCached != null) {
+                                                V val0 = null;
+                                                byte[] valBytes0 = null;
+
+                                                GridCacheValueBytes 
valBytesTuple = cached.valueBytes();
+
+                                                if (!valBytesTuple.isNull()) {
+                                                    if 
(valBytesTuple.isPlain())
+                                                        val0 = 
(V)valBytesTuple.get();
+                                                    else
+                                                        valBytes0 = 
valBytesTuple.get();
+                                                }
+                                                else
+                                                    val0 = cached.rawGet();
+
+                                                
nearCached.updateOrEvict(xidVer, val0, valBytes0, cached.expireTime(),
+                                                    cached.ttl(), nodeId);
+                                            }
+                                        }
+                                    }
+                                    else if (op == DELETE) {
+                                        cached.innerRemove(this, 
eventNodeId(), nodeId, false, false, true, true,
+                                            topVer, txEntry.filters(), 
replicate ? DR_BACKUP : DR_NONE,
+                                            near() ? null : explicitVer, 
CU.subjectId(this, cctx), resolveTaskName());
+
+                                        // Keep near entry up to date.
+                                        if (nearCached != null)
+                                            nearCached.updateOrEvict(xidVer, 
null, null, 0, 0, nodeId);
+                                    }
+                                    else if (op == RELOAD) {
+                                        V reloaded = cached.innerReload(CU.<K, 
V>empty());
+
+                                        if (nearCached != null) {
+                                            nearCached.innerReload(CU.<K, 
V>empty());
+
+                                            
nearCached.updateOrEvict(cached.version(), reloaded, null,
+                                                cached.expireTime(), 
cached.ttl(), nodeId);
+                                        }
+                                    }
+                                    else if (op == READ) {
+                                        assert near();
+
+                                        if (log.isDebugEnabled())
+                                            log.debug("Ignoring READ entry 
when committing: " + txEntry);
+                                    }
+                                    // No-op.
+                                    else {
+                                        assert !groupLock() || 
txEntry.groupLockEntry() || ownsLock(txEntry.cached()):
+                                            "Transaction does not own lock for 
group lock entry during  commit [tx=" +
+                                                this + ", txEntry=" + txEntry 
+ ']';
+
+                                        if (txEntry.ttl() != -1L)
+                                            cached.updateTtl(null, 
txEntry.ttl());
+
+                                        if (nearCached != null) {
+                                            V val0 = null;
+                                            byte[] valBytes0 = null;
+
+                                            GridCacheValueBytes valBytesTuple 
= cached.valueBytes();
+
+                                            if (!valBytesTuple.isNull()) {
+                                                if (valBytesTuple.isPlain())
+                                                    val0 = 
(V)valBytesTuple.get();
+                                                else
+                                                    valBytes0 = 
valBytesTuple.get();
+                                            }
+                                            else
+                                                val0 = cached.rawGet();
+
+                                            nearCached.updateOrEvict(xidVer, 
val0, valBytes0, cached.expireTime(),
+                                                cached.ttl(), nodeId);
+                                        }
+                                    }
+
+                                    // Assert after setting values as we want 
to make sure
+                                    // that if we replaced removed entries.
+                                    assert
+                                        txEntry.op() == READ || 
onePhaseCommit() ||
+                                            // If candidate is not there, then 
lock was explicit
+                                            // and we simply allow the commit 
to proceed.
+                                            
!cached.hasLockCandidateUnsafe(xidVer) || cached.lockedByUnsafe(xidVer) :
+                                        "Transaction does not own lock for 
commit [entry=" + cached +
+                                            ", tx=" + this + ']';
+
+                                    // Break out of while loop.
+                                    break;
+                                }
+                                catch (GridCacheEntryRemovedException ignored) 
{
+                                    if (log.isDebugEnabled())
+                                        log.debug("Attempting to commit a 
removed entry (will retry): " + txEntry);
+
+                                    // Renew cached entry.
+                                    
txEntry.cached(cacheCtx.cache().entryEx(txEntry.key()), txEntry.keyBytes());
+                                }
+                            }
+                        }
+                        catch (Throwable ex) {
+                            state(UNKNOWN);
+
+                            // In case of error, we still make the best effort 
to commit,
+                            // as there is no way to rollback at this point.
+                            err = ex instanceof IgniteCheckedException ? 
(IgniteCheckedException)ex :
+                                new IgniteCheckedException("Commit produced a 
runtime exception: " + this, ex);
+                        }
+                    }
+                }
+
+                if (err != null) {
+                    state(UNKNOWN);
+
+                    throw err;
+                }
+
+                cctx.tm().commitTx(this);
+
+                state(COMMITTED);
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void commit() throws IgniteCheckedException {
+        if (optimistic())
+            state(PREPARED);
+
+        if (!state(COMMITTING)) {
+            IgniteTxState state = state();
+
+            // If other thread is doing commit, then no-op.
+            if (state == COMMITTING || state == COMMITTED)
+                return;
+
+            if (log.isDebugEnabled())
+                log.debug("Failed to set COMMITTING transaction state (will 
rollback): " + this);
+
+            setRollbackOnly();
+
+            if (!isSystemInvalidate())
+                throw new IgniteCheckedException("Invalid transaction state 
for commit [state=" + state + ", tx=" + this + ']');
+
+            rollback();
+        }
+
+        commitIfLocked();
+    }
+
+    /**
+     * Forces commit for this tx.
+     *
+     * @throws IgniteCheckedException If commit failed.
+     */
+    public void forceCommit() throws IgniteCheckedException {
+        commitIfLocked();
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<IgniteTx> commitAsync() {
+        try {
+            commit();
+
+            return new GridFinishedFutureEx<IgniteTx>(this);
+        }
+        catch (IgniteCheckedException e) {
+            return new GridFinishedFutureEx<>(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings({"CatchGenericClass"})
+    @Override public void rollback() {
+        try {
+            // Note that we don't evict near entries here -
+            // they will be deleted by their corresponding transactions.
+            if (state(ROLLING_BACK)) {
+                cctx.tm().rollbackTx(this);
+
+                state(ROLLED_BACK);
+            }
+        }
+        catch (RuntimeException | Error e) {
+            state(UNKNOWN);
+
+            throw e;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<IgniteTx> rollbackAsync() {
+        rollback();
+
+        return new GridFinishedFutureEx<IgniteTx>(this);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<GridCacheVersion> alternateVersions() {
+        return explicitVers == null ? 
Collections.<GridCacheVersion>emptyList() : explicitVers;
+    }
+
+    /**
+     * Adds explicit version if there is one.
+     *
+     * @param e Transaction entry.
+     */
+    protected void addExplicit(IgniteTxEntry<K, V> e) {
+        if (e.explicitVersion() != null) {
+            if (explicitVers == null)
+                explicitVers = new LinkedList<>();
+
+            if (!explicitVers.contains(e.explicitVersion())) {
+                explicitVers.add(e.explicitVersion());
+
+                if (log.isDebugEnabled())
+                    log.debug("Added explicit version to transaction 
[explicitVer=" + e.explicitVersion() +
+                        ", tx=" + this + ']');
+
+                // Register alternate version with TM.
+                cctx.tm().addAlternateVersion(e.explicitVersion(), this);
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return 
GridToStringBuilder.toString(GridDistributedTxRemoteAdapter.class, this, 
"super", super.toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b89b472d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedUnlockRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedUnlockRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedUnlockRequest.java
new file mode 100644
index 0000000..9e475d3
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedUnlockRequest.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
+import org.gridgain.grid.kernal.processors.cache.*;
+import org.apache.ignite.internal.util.direct.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.io.*;
+import java.nio.*;
+import java.util.*;
+
+/**
+ * Lock request message.
+ */
+public class GridDistributedUnlockRequest<K, V> extends 
GridDistributedBaseMessage<K, V> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Keys to unlock. */
+    @GridDirectCollection(byte[].class)
+    private List<byte[]> keyBytes;
+
+    /** Keys. */
+    @GridDirectTransient
+    private List<K> keys;
+
+    /**
+     * Empty constructor required by {@link Externalizable}.
+     */
+    public GridDistributedUnlockRequest() {
+        /* No-op. */
+    }
+
+    /**
+     * @param cacheId Cache ID.
+     * @param keyCnt Key count.
+     */
+    public GridDistributedUnlockRequest(int cacheId, int keyCnt) {
+        super(keyCnt);
+
+        this.cacheId = cacheId;
+    }
+
+    /**
+     * @return Key to lock.
+     */
+    public List<byte[]> keyBytes() {
+        return keyBytes;
+    }
+
+    /**
+     * @return Keys.
+     */
+    public List<K> keys() {
+        return keys;
+    }
+
+    /**
+     * @param key Key.
+     * @param bytes Key bytes.
+     * @param ctx Context.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void addKey(K key, byte[] bytes, GridCacheContext<K, V> ctx) throws 
IgniteCheckedException {
+        boolean depEnabled = ctx.deploymentEnabled();
+
+        if (depEnabled)
+            prepareObject(key, ctx.shared());
+
+        if (keys == null)
+            keys = new ArrayList<>(keysCount());
+
+        keys.add(key);
+
+        if (keyBytes == null)
+            keyBytes = new ArrayList<>(keysCount());
+
+        keyBytes.add(bytes);
+    }
+
+    /** {@inheritDoc}
+     * @param ctx*/
+    @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) 
throws IgniteCheckedException {
+        super.prepareMarshal(ctx);
+
+        if (F.isEmpty(keyBytes) && !F.isEmpty(keys))
+            keyBytes = marshalCollection(keys, ctx);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void finishUnmarshal(GridCacheSharedContext<K, V> ctx, 
ClassLoader ldr) throws IgniteCheckedException {
+        super.finishUnmarshal(ctx, ldr);
+
+        if (keys == null && !F.isEmpty(keyBytes))
+            keys = unmarshalCollection(keyBytes, ctx, ldr);
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors",
+        "OverriddenMethodCallDuringObjectConstruction"})
+    @Override public GridTcpCommunicationMessageAdapter clone() {
+        GridDistributedUnlockRequest _clone = new 
GridDistributedUnlockRequest();
+
+        clone0(_clone);
+
+        return _clone;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
+        super.clone0(_msg);
+
+        GridDistributedUnlockRequest _clone = 
(GridDistributedUnlockRequest)_msg;
+
+        _clone.keyBytes = keyBytes;
+        _clone.keys = keys;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("all")
+    @Override public boolean writeTo(ByteBuffer buf) {
+        commState.setBuffer(buf);
+
+        if (!super.writeTo(buf))
+            return false;
+
+        if (!commState.typeWritten) {
+            if (!commState.putByte(directType()))
+                return false;
+
+            commState.typeWritten = true;
+        }
+
+        switch (commState.idx) {
+            case 8:
+                if (keyBytes != null) {
+                    if (commState.it == null) {
+                        if (!commState.putInt(keyBytes.size()))
+                            return false;
+
+                        commState.it = keyBytes.iterator();
+                    }
+
+                    while (commState.it.hasNext() || commState.cur != NULL) {
+                        if (commState.cur == NULL)
+                            commState.cur = commState.it.next();
+
+                        if (!commState.putByteArray((byte[])commState.cur))
+                            return false;
+
+                        commState.cur = NULL;
+                    }
+
+                    commState.it = null;
+                } else {
+                    if (!commState.putInt(-1))
+                        return false;
+                }
+
+                commState.idx++;
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("all")
+    @Override public boolean readFrom(ByteBuffer buf) {
+        commState.setBuffer(buf);
+
+        if (!super.readFrom(buf))
+            return false;
+
+        switch (commState.idx) {
+            case 8:
+                if (commState.readSize == -1) {
+                    if (buf.remaining() < 4)
+                        return false;
+
+                    commState.readSize = commState.getInt();
+                }
+
+                if (commState.readSize >= 0) {
+                    if (keyBytes == null)
+                        keyBytes = new ArrayList<>(commState.readSize);
+
+                    for (int i = commState.readItems; i < commState.readSize; 
i++) {
+                        byte[] _val = commState.getByteArray();
+
+                        if (_val == BYTE_ARR_NOT_READ)
+                            return false;
+
+                        keyBytes.add((byte[])_val);
+
+                        commState.readItems++;
+                    }
+                }
+
+                commState.readSize = -1;
+                commState.readItems = 0;
+
+                commState.idx++;
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return 28;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridDistributedUnlockRequest.class, this, 
"keyBytesSize",
+            keyBytes == null ? 0 : keyBytes.size(), "super", super.toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b89b472d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridPartitionedCacheEntryImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridPartitionedCacheEntryImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridPartitionedCacheEntryImpl.java
new file mode 100644
index 0000000..63eb41f
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridPartitionedCacheEntryImpl.java
@@ -0,0 +1,423 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.lang.*;
+import org.gridgain.grid.kernal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.*;
+import org.apache.ignite.internal.processors.cache.distributed.near.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.cache.GridCachePeekMode.*;
+
+/**
+ * Partitioned cache entry public API.
+ */
+public class GridPartitionedCacheEntryImpl<K, V> extends GridCacheEntryImpl<K, 
V> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /**
+     * Empty constructor required for {@link Externalizable}.
+     */
+    public GridPartitionedCacheEntryImpl() {
+        // No-op.
+    }
+
+    /**
+     * @param nearPrj Parent projection or {@code null} if entry belongs to 
default cache.
+     * @param ctx Near cache context.
+     * @param key key.
+     * @param cached Cached entry (either from near or dht cache map).
+     */
+    public GridPartitionedCacheEntryImpl(GridCacheProjectionImpl<K, V> 
nearPrj, GridCacheContext<K, V> ctx, K key,
+        @Nullable GridCacheEntryEx<K, V> cached) {
+        super(nearPrj, ctx, key, cached);
+
+        assert !this.ctx.isDht() || ctx.isColocated();
+    }
+
+    /**
+     * @return Dht cache.
+     */
+    public GridDhtCacheAdapter<K, V> dht() {
+        return ctx.isColocated() ? ctx.colocated() : ctx.isDhtAtomic() ? 
ctx.dht() : ctx.near().dht();
+    }
+
+    /**
+     * @return Near cache.
+     */
+    public GridNearCacheAdapter<K, V> near() {
+        return ctx.near();
+    }
+
+    /** {@inheritDoc} */
+    @Override public V peek(@Nullable Collection<GridCachePeekMode> modes) 
throws IgniteCheckedException {
+        if (modes.contains(NEAR_ONLY) && ctx.isNear())
+            return peekNear0(modes, CU.<K, V>empty());
+
+        V val = null;
+
+        if (!modes.contains(PARTITIONED_ONLY))
+            val = super.peek(modes);
+
+        if (val == null)
+            val = peekDht0(modes, CU.<K, V>empty());
+
+        return val;
+    }
+
+    /**
+     * @param filter Filter.
+     * @return Peeked value.
+     */
+    @Nullable public V peekDht(@Nullable IgnitePredicate<GridCacheEntry<K, 
V>>[] filter) {
+        try {
+            return peekDht0(SMART, filter);
+        }
+        catch (IgniteCheckedException e) {
+            // Should never happen.
+            throw new IgniteException("Unable to perform entry peek() 
operation.", e);
+        }
+    }
+
+    /**
+     * @param modes Peek modes.
+     * @param filter Optional entry filter.
+     * @return Peeked value.
+     * @throws IgniteCheckedException If failed.
+     */
+    @Nullable private V peekNear0(@Nullable Collection<GridCachePeekMode> 
modes,
+        @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter) throws 
IgniteCheckedException {
+        if (F.isEmpty(modes))
+            return peekNear0(SMART, filter);
+
+        assert modes != null;
+
+        for (GridCachePeekMode mode : modes) {
+            V val = peekNear0(mode, filter);
+
+            if (val != null)
+                return val;
+        }
+
+        return null;
+    }
+
+    /**
+     * @param mode Peek mode.
+     * @param filter Optional entry filter.
+     * @return Peeked value.
+     * @throws IgniteCheckedException If failed.
+     */
+    @SuppressWarnings({"unchecked"})
+    @Nullable private V peekNear0(@Nullable GridCachePeekMode mode,
+        @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter) throws 
IgniteCheckedException {
+        if (mode == null)
+            mode = SMART;
+
+        while (true) {
+            GridCacheProjectionImpl<K, V> prjPerCall = proxy.gateProjection();
+
+            if (prjPerCall != null)
+                filter = ctx.vararg(F0.and(ctx.vararg(proxy.predicate()), 
filter));
+
+            GridCacheProjectionImpl<K, V> prev = ctx.gate().enter(prjPerCall);
+
+            try {
+                GridCacheEntryEx<K, V> entry = near().peekEx(key);
+
+                return entry == null ? null : ctx.cloneOnFlag(entry.peek(mode, 
filter));
+            }
+            catch (GridCacheEntryRemovedException ignore) {
+                // No-op.
+            }
+            finally {
+                ctx.gate().leave(prev);
+            }
+        }
+    }
+
+    /**
+     * @param modes Peek modes.
+     * @param filter Optional entry filter.
+     * @return Peeked value.
+     * @throws IgniteCheckedException If failed.
+     */
+    @Nullable private V peekDht0(@Nullable Collection<GridCachePeekMode> modes,
+        @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter) throws 
IgniteCheckedException {
+        if (F.isEmpty(modes))
+            return peekDht0(SMART, filter);
+
+        assert modes != null;
+
+        for (GridCachePeekMode mode : modes) {
+            V val = peekDht0(mode, filter);
+
+            if (val != null)
+                return val;
+        }
+
+        return null;
+    }
+
+    /**
+     * @param mode Peek mode.
+     * @param filter Optional entry filter.
+     * @return Peeked value.
+     * @throws IgniteCheckedException If failed.
+     */
+    @SuppressWarnings({"unchecked"})
+    @Nullable private V peekDht0(@Nullable GridCachePeekMode mode,
+        @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter) throws 
IgniteCheckedException {
+        if (mode == null)
+            mode = SMART;
+
+        while (true) {
+            GridCacheProjectionImpl<K, V> prjPerCall = proxy.gateProjection();
+
+            if (prjPerCall != null)
+                filter = ctx.vararg(F0.and(ctx.vararg(proxy.predicate()), 
filter));
+
+            GridCacheProjectionImpl<K, V> prev = ctx.gate().enter(prjPerCall);
+
+            try {
+                GridCacheEntryEx<K, V> entry = dht().peekEx(key);
+
+                if (entry == null)
+                    return null;
+                else {
+                    GridTuple<V> peek = entry.peek0(false, mode, filter, 
ctx.tm().localTxx());
+
+                    return peek != null ? ctx.cloneOnFlag(peek.get()) : null;
+                }
+            }
+            catch (GridCacheEntryRemovedException ignore) {
+                // No-op.
+            }
+            catch (GridCacheFilterFailedException e) {
+                e.printStackTrace();
+
+                assert false;
+
+                return null;
+            }
+            finally {
+                ctx.gate().leave(prev);
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected GridCacheEntryEx<K, V> entryEx(boolean touch, long 
topVer) {
+        try {
+            return ctx.affinity().localNode(key, topVer) ? dht().entryEx(key, 
touch) :
+                ctx.isNear() ? near().entryEx(key, touch) :
+                    new GridDhtDetachedCacheEntry<>(ctx, key, 0, null, null, 
0, 0);
+        }
+        catch (GridDhtInvalidPartitionException ignore) {
+            return ctx.isNear() ? near().entryEx(key) :
+                new GridDhtDetachedCacheEntry<>(ctx, key, 0, null, null, 0, 0);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected GridCacheEntryEx<K, V> peekEx(long topVer) {
+        try {
+            return ctx.affinity().localNode(key, topVer) ? dht().peekEx(key) :
+                ctx.isNear() ? near().peekEx(key) : null;
+        }
+        catch (GridDhtInvalidPartitionException ignore) {
+            return ctx.isNear() ? near().peekEx(key) : null;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public <V1> V1 addMeta(String name, V1 val) {
+        V1 v = null;
+
+        GridDhtCacheEntry<K, V> de = dht().peekExx(key);
+
+        if (de != null)
+            v = de.addMeta(name, val);
+
+        if (ctx.isNear()) {
+            GridNearCacheEntry<K, V> ne = de != null ? near().peekExx(key) :
+                near().entryExx(key, ctx.affinity().affinityTopologyVersion());
+
+            if (ne != null) {
+                V1 v1 = ne.addMeta(name, val);
+
+                if (v == null)
+                    v = v1;
+            }
+        }
+
+        return v;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings( {"RedundantCast"})
+    @Override public <V1> V1 meta(String name) {
+        V1 v = null;
+
+        GridDhtCacheEntry<K, V> de = dht().peekExx(key);
+
+        if (de != null)
+            v = (V1)de.meta(name);
+
+        if (ctx.isNear()) {
+            GridNearCacheEntry<K, V> ne = near().peekExx(key);
+
+            if (ne != null) {
+                V1 v1 = (V1)ne.meta(name);
+
+                if (v == null)
+                    v = v1;
+            }
+        }
+
+        return v;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings( {"RedundantCast"})
+    @Override public <V1> V1 putMetaIfAbsent(String name, Callable<V1> c) {
+        V1 v = null;
+
+        GridDhtCacheEntry<K, V> de = dht().peekExx(key);
+
+        if (de != null)
+            v = (V1)de.putMetaIfAbsent(name, c);
+
+        if (ctx.isNear()) {
+            GridNearCacheEntry<K, V> ne = de != null ? near().peekExx(key) :
+                near().entryExx(key, ctx.affinity().affinityTopologyVersion());
+
+            if (ne != null) {
+                V1 v1 = (V1)ne.putMetaIfAbsent(name, c);
+
+                if (v == null)
+                    v = v1;
+            }
+        }
+
+        return v;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings( {"RedundantCast"})
+    @Override public <V1> V1 putMetaIfAbsent(String name, V1 val) {
+        V1 v = null;
+
+        GridDhtCacheEntry<K, V> de = dht().peekExx(key);
+
+        if (de != null)
+            v = (V1)de.putMetaIfAbsent(name, val);
+
+        GridNearCacheEntry<K, V> ne = de != null ? near().peekExx(key) :
+            near().entryExx(key, ctx.affinity().affinityTopologyVersion());
+
+        if (ne != null) {
+            V1 v1 = (V1)ne.putMetaIfAbsent(name, val);
+
+            if (v == null)
+                v = v1;
+        }
+
+        return v;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings( {"RedundantCast"})
+    @Override public <V1> V1 removeMeta(String name) {
+        V1 v = null;
+
+        GridDhtCacheEntry<K, V> de = dht().peekExx(key);
+
+        if (de != null)
+            v = (V1)de.removeMeta(name);
+
+        if (ctx.isNear()) {
+            GridNearCacheEntry<K, V> ne = near().peekExx(key);
+
+            if (ne != null) {
+                V1 v1 = (V1)ne.removeMeta(name);
+
+                if (v == null)
+                    v = v1;
+            }
+        }
+
+        return v;
+    }
+
+    /** {@inheritDoc} */
+    @Override public <V1> boolean removeMeta(String name, V1 val) {
+        boolean b = false;
+
+        GridDhtCacheEntry<K, V> de = dht().peekExx(key);
+
+        if (de != null)
+            b = de.removeMeta(name, val);
+
+        if (ctx.isNear()) {
+            GridNearCacheEntry<K, V> ne = near().peekExx(key);
+
+            if (ne != null)
+                b |= ne.removeMeta(name, val);
+        }
+
+        return b;
+    }
+
+    /** {@inheritDoc} */
+    @Override public <V1> boolean replaceMeta(String name, V1 curVal, V1 
newVal) {
+        boolean b = false;
+
+        GridDhtCacheEntry<K, V> de = dht().peekExx(key);
+
+        if (de != null)
+            b = de.replaceMeta(name, curVal, newVal);
+
+        if (ctx.isNear()) {
+            GridNearCacheEntry<K, V> ne = near().peekExx(key);
+
+            if (ne != null)
+                b |= ne.replaceMeta(name, curVal, newVal);
+        }
+
+        return b;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridPartitionedCacheEntryImpl.class, this, 
super.toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b89b472d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/IgniteExternalizableExpiryPolicy.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/IgniteExternalizableExpiryPolicy.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/IgniteExternalizableExpiryPolicy.java
new file mode 100644
index 0000000..e9b748c
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/IgniteExternalizableExpiryPolicy.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.marshaller.optimized.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import javax.cache.expiry.*;
+import java.io.*;
+import java.util.concurrent.*;
+
+/**
+ * Externalizable wrapper for {@link ExpiryPolicy}.
+ */
+public class IgniteExternalizableExpiryPolicy implements ExpiryPolicy, 
Externalizable, IgniteOptimizedMarshallable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    @SuppressWarnings({"NonConstantFieldWithUpperCaseName", 
"AbbreviationUsage", "UnusedDeclaration"})
+    private static Object GG_CLASS_ID;
+
+    /** */
+    private ExpiryPolicy plc;
+
+    /** */
+    private static final byte CREATE_TTL_MASK = 0x01;
+
+    /** */
+    private static final byte UPDATE_TTL_MASK = 0x02;
+
+    /** */
+    private static final byte ACCESS_TTL_MASK = 0x04;
+
+    /** */
+    private Duration forCreate;
+
+    /** */
+    private Duration forUpdate;
+
+    /** */
+    private Duration forAccess;
+
+    /**
+     * Required by {@link Externalizable}.
+     */
+    public IgniteExternalizableExpiryPolicy() {
+        // No-op.
+    }
+
+    /**
+     * @param plc Expiry policy.
+     */
+    public IgniteExternalizableExpiryPolicy(ExpiryPolicy plc) {
+        assert plc != null;
+
+        this.plc = plc;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Object ggClassId() {
+        return GG_CLASS_ID;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Duration getExpiryForCreation() {
+        return forCreate;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Duration getExpiryForAccess() {
+        return forAccess;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Duration getExpiryForUpdate() {
+        return forUpdate;
+    }
+
+    /**
+     * @param out Output stream.
+     * @param duration Duration.
+     * @throws IOException If failed.
+     */
+    private void writeDuration(ObjectOutput out, @Nullable Duration duration) 
throws IOException {
+        if (duration != null) {
+            if (duration.isEternal())
+                out.writeLong(0);
+            else if (duration.getDurationAmount() == 0)
+                out.writeLong(1);
+            else
+                
out.writeLong(duration.getTimeUnit().toMillis(duration.getDurationAmount()));
+        }
+    }
+
+    /**
+     * @param in Input stream.
+     * @return Duration.
+     * @throws IOException If failed.
+     */
+    private Duration readDuration(ObjectInput in) throws IOException {
+        long ttl = in.readLong();
+
+        assert ttl >= 0;
+
+        if (ttl == 0)
+            return Duration.ETERNAL;
+
+        return new Duration(TimeUnit.MILLISECONDS, ttl);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        byte flags = 0;
+
+        Duration create = plc.getExpiryForCreation();
+
+        if (create != null)
+            flags |= CREATE_TTL_MASK;
+
+        Duration update = plc.getExpiryForUpdate();
+
+        if (update != null)
+            flags |= UPDATE_TTL_MASK;
+
+        Duration access = plc.getExpiryForAccess();
+
+        if (access != null)
+            flags |= ACCESS_TTL_MASK;
+
+        out.writeByte(flags);
+
+        writeDuration(out, create);
+
+        writeDuration(out, update);
+
+        writeDuration(out, access);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
+        byte flags = in.readByte();
+
+        if ((flags & CREATE_TTL_MASK) != 0)
+            forCreate = readDuration(in);
+
+        if ((flags & UPDATE_TTL_MASK) != 0)
+            forUpdate = readDuration(in);
+
+        if ((flags & ACCESS_TTL_MASK) != 0)
+            forAccess = readDuration(in);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IgniteExternalizableExpiryPolicy.class, this);
+    }
+}

Reply via email to