http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
deleted file mode 100644
index e704006..0000000
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
+++ /dev/null
@@ -1,702 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.gridgain.grid.kernal.processors.cache.distributed.dht;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.util.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.transactions.*;
-import org.gridgain.grid.kernal.processors.cache.*;
-import org.gridgain.grid.kernal.processors.cache.distributed.*;
-import org.apache.ignite.internal.processors.cache.transactions.*;
-import org.apache.ignite.internal.util.direct.*;
-import org.apache.ignite.internal.util.tostring.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.nio.*;
-import java.util.*;
-
-/**
- * Near transaction finish request.
- */
-public class GridDhtTxFinishRequest<K, V> extends 
GridDistributedTxFinishRequest<K, V> {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Near node ID. */
-    private UUID nearNodeId;
-
-    /** Transaction isolation. */
-    private IgniteTxIsolation isolation;
-
-    /** Near writes. */
-    @GridToStringInclude
-    @GridDirectTransient
-    private Collection<IgniteTxEntry<K, V>> nearWrites;
-
-    /** Serialized near writes. */
-    @GridDirectCollection(byte[].class)
-    private Collection<byte[]> nearWritesBytes;
-
-    /** Mini future ID. */
-    private IgniteUuid miniId;
-
-    /** System invalidation flag. */
-    private boolean sysInvalidate;
-
-    /** Topology version. */
-    private long topVer;
-
-    /** Pending versions with order less than one for this message (needed for 
commit ordering). */
-    @GridToStringInclude
-    @GridDirectCollection(GridCacheVersion.class)
-    private Collection<GridCacheVersion> pendingVers;
-
-    /** One phase commit flag for fast-commit path. */
-    private boolean onePhaseCommit;
-
-    /** One phase commit write version. */
-    private GridCacheVersion writeVer;
-
-    /** Subject ID. */
-    @GridDirectVersion(1)
-    private UUID subjId;
-
-    /** Task name hash. */
-    @GridDirectVersion(2)
-    private int taskNameHash;
-
-    /** TTLs for optimistic transaction. */
-    private GridLongList ttls;
-
-    /** Near cache TTLs for optimistic transaction. */
-    private GridLongList nearTtls;
-
-    /**
-     * Empty constructor required for {@link Externalizable}.
-     */
-    public GridDhtTxFinishRequest() {
-        // No-op.
-    }
-
-    /**
-     * @param nearNodeId Near node ID.
-     * @param futId Future ID.
-     * @param miniId Mini future ID.
-     * @param topVer Topology version.
-     * @param xidVer Transaction ID.
-     * @param threadId Thread ID.
-     * @param commitVer Commit version.
-     * @param isolation Transaction isolation.
-     * @param commit Commit flag.
-     * @param invalidate Invalidate flag.
-     * @param sys System flag.
-     * @param sysInvalidate System invalidation flag.
-     * @param syncCommit Synchronous commit flag.
-     * @param syncRollback Synchronous rollback flag.
-     * @param baseVer Base version.
-     * @param committedVers Committed versions.
-     * @param rolledbackVers Rolled back versions.
-     * @param pendingVers Pending versions.
-     * @param txSize Expected transaction size.
-     * @param writes Write entries.
-     * @param nearWrites Near cache writes.
-     * @param recoverWrites Recovery write entries.
-     * @param onePhaseCommit One phase commit flag.
-     * @param grpLockKey Group lock key.
-     * @param subjId Subject ID.
-     * @param taskNameHash Task name hash.
-     */
-    public GridDhtTxFinishRequest(
-        UUID nearNodeId,
-        IgniteUuid futId,
-        IgniteUuid miniId,
-        long topVer,
-        GridCacheVersion xidVer,
-        GridCacheVersion commitVer,
-        long threadId,
-        IgniteTxIsolation isolation,
-        boolean commit,
-        boolean invalidate,
-        boolean sys,
-        boolean sysInvalidate,
-        boolean syncCommit,
-        boolean syncRollback,
-        GridCacheVersion baseVer,
-        Collection<GridCacheVersion> committedVers,
-        Collection<GridCacheVersion> rolledbackVers,
-        Collection<GridCacheVersion> pendingVers,
-        int txSize,
-        Collection<IgniteTxEntry<K, V>> writes,
-        Collection<IgniteTxEntry<K, V>> nearWrites,
-        Collection<IgniteTxEntry<K, V>> recoverWrites,
-        boolean onePhaseCommit,
-        @Nullable IgniteTxKey grpLockKey,
-        @Nullable UUID subjId,
-        int taskNameHash
-    ) {
-        super(xidVer, futId, commitVer, threadId, commit, invalidate, sys, 
syncCommit, syncRollback, baseVer,
-            committedVers, rolledbackVers, txSize, writes, recoverWrites, 
grpLockKey);
-
-        assert miniId != null;
-        assert nearNodeId != null;
-        assert isolation != null;
-
-        this.pendingVers = pendingVers;
-        this.topVer = topVer;
-        this.nearNodeId = nearNodeId;
-        this.isolation = isolation;
-        this.nearWrites = nearWrites;
-        this.miniId = miniId;
-        this.sysInvalidate = sysInvalidate;
-        this.onePhaseCommit = onePhaseCommit;
-        this.subjId = subjId;
-        this.taskNameHash = taskNameHash;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean allowForStartup() {
-        return true;
-    }
-
-    /**
-     * @return Near writes.
-     */
-    public Collection<IgniteTxEntry<K, V>> nearWrites() {
-        return nearWrites == null ? Collections.<IgniteTxEntry<K, 
V>>emptyList() : nearWrites;
-    }
-
-    /**
-     * @return Mini ID.
-     */
-    public IgniteUuid miniId() {
-        return miniId;
-    }
-
-    /**
-     * @return Subject ID.
-     */
-    @Nullable public UUID subjectId() {
-        return subjId;
-    }
-
-    /**
-     * @return Task name hash.
-     */
-    public int taskNameHash() {
-        return taskNameHash;
-    }
-
-    /**
-     * @return Transaction isolation.
-     */
-    public IgniteTxIsolation isolation() {
-        return isolation;
-    }
-
-    /**
-     * @return Near node ID.
-     */
-    public UUID nearNodeId() {
-        return nearNodeId;
-    }
-
-    /**
-     * @return System invalidate flag.
-     */
-    public boolean isSystemInvalidate() {
-        return sysInvalidate;
-    }
-
-    /**
-     * @return One phase commit flag.
-     */
-    public boolean onePhaseCommit() {
-        return onePhaseCommit;
-    }
-
-    /**
-     * @return Write version for one-phase commit transactions.
-     */
-    public GridCacheVersion writeVersion() {
-        return writeVer;
-    }
-
-    /**
-     * @param writeVer Write version for one-phase commit transactions.
-     */
-    public void writeVersion(GridCacheVersion writeVer) {
-        this.writeVer = writeVer;
-    }
-
-    /**
-     * @return Topology version.
-     */
-    @Override public long topologyVersion() {
-        return topVer;
-    }
-
-    /**
-     * Gets versions of not acquired locks with version less then one of 
transaction being committed.
-     *
-     * @return Versions of locks for entries participating in transaction that 
have not been acquired yet
-     *      have version less then one of transaction being committed.
-     */
-    public Collection<GridCacheVersion> pendingVersions() {
-        return pendingVers == null ? Collections.<GridCacheVersion>emptyList() 
: pendingVers;
-    }
-
-    /**
-     * @param idx Entry index.
-     * @param ttl TTL.
-     */
-    public void ttl(int idx, long ttl) {
-        if (ttl != -1L) {
-            if (ttls == null) {
-                ttls = new GridLongList();
-
-                for (int i = 0; i < idx - 1; i++)
-                    ttls.add(-1L);
-            }
-        }
-
-        if (ttls != null)
-            ttls.add(ttl);
-    }
-
-    /**
-     * @return TTLs for optimistic transaction.
-     */
-    public GridLongList ttls() {
-        return ttls;
-    }
-
-    /**
-     * @param idx Entry index.
-     * @param ttl TTL.
-     */
-    public void nearTtl(int idx, long ttl) {
-        if (ttl != -1L) {
-            if (nearTtls == null) {
-                nearTtls = new GridLongList();
-
-                for (int i = 0; i < idx - 1; i++)
-                    nearTtls.add(-1L);
-            }
-        }
-
-        if (nearTtls != null)
-            nearTtls.add(ttl);
-    }
-
-    /**
-     * @return TTLs for optimistic transaction.
-     */
-    public GridLongList nearTtls() {
-        return nearTtls;
-    }
-
-    /** {@inheritDoc}
-     * @param ctx*/
-    @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) 
throws IgniteCheckedException {
-        super.prepareMarshal(ctx);
-
-        if (nearWrites != null) {
-            marshalTx(nearWrites, ctx);
-
-            nearWritesBytes = new ArrayList<>(nearWrites.size());
-
-            for (IgniteTxEntry<K, V> e : nearWrites)
-                nearWritesBytes.add(ctx.marshaller().marshal(e));
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void finishUnmarshal(GridCacheSharedContext<K, V> ctx, 
ClassLoader ldr) throws IgniteCheckedException {
-        super.finishUnmarshal(ctx, ldr);
-
-        if (nearWritesBytes != null) {
-            nearWrites = new ArrayList<>(nearWritesBytes.size());
-
-            for (byte[] arr : nearWritesBytes)
-                nearWrites.add(ctx.marshaller().<IgniteTxEntry<K, 
V>>unmarshal(arr, ldr));
-
-            unmarshalTx(nearWrites, true, ctx, ldr);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridDhtTxFinishRequest.class, this, 
super.toString());
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"})
-    @Override public GridTcpCommunicationMessageAdapter clone() {
-        GridDhtTxFinishRequest _clone = new GridDhtTxFinishRequest();
-
-        clone0(_clone);
-
-        return _clone;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
-        super.clone0(_msg);
-
-        GridDhtTxFinishRequest _clone = (GridDhtTxFinishRequest)_msg;
-
-        _clone.nearNodeId = nearNodeId;
-        _clone.isolation = isolation;
-        _clone.nearWrites = nearWrites;
-        _clone.nearWritesBytes = nearWritesBytes;
-        _clone.miniId = miniId;
-        _clone.sysInvalidate = sysInvalidate;
-        _clone.topVer = topVer;
-        _clone.pendingVers = pendingVers;
-        _clone.onePhaseCommit = onePhaseCommit;
-        _clone.writeVer = writeVer;
-        _clone.subjId = subjId;
-        _clone.taskNameHash = taskNameHash;
-        _clone.ttls = ttls;
-        _clone.nearTtls = nearTtls;
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("all")
-    @Override public boolean writeTo(ByteBuffer buf) {
-        commState.setBuffer(buf);
-
-        if (!super.writeTo(buf))
-            return false;
-
-        if (!commState.typeWritten) {
-            if (!commState.putByte(directType()))
-                return false;
-
-            commState.typeWritten = true;
-        }
-
-        switch (commState.idx) {
-            case 21:
-                if (!commState.putEnum(isolation))
-                    return false;
-
-                commState.idx++;
-
-            case 22:
-                if (!commState.putGridUuid(miniId))
-                    return false;
-
-                commState.idx++;
-
-            case 23:
-                if (!commState.putUuid(nearNodeId))
-                    return false;
-
-                commState.idx++;
-
-            case 24:
-                if (nearWritesBytes != null) {
-                    if (commState.it == null) {
-                        if (!commState.putInt(nearWritesBytes.size()))
-                            return false;
-
-                        commState.it = nearWritesBytes.iterator();
-                    }
-
-                    while (commState.it.hasNext() || commState.cur != NULL) {
-                        if (commState.cur == NULL)
-                            commState.cur = commState.it.next();
-
-                        if (!commState.putByteArray((byte[])commState.cur))
-                            return false;
-
-                        commState.cur = NULL;
-                    }
-
-                    commState.it = null;
-                } else {
-                    if (!commState.putInt(-1))
-                        return false;
-                }
-
-                commState.idx++;
-
-            case 25:
-                if (!commState.putBoolean(onePhaseCommit))
-                    return false;
-
-                commState.idx++;
-
-            case 26:
-                if (pendingVers != null) {
-                    if (commState.it == null) {
-                        if (!commState.putInt(pendingVers.size()))
-                            return false;
-
-                        commState.it = pendingVers.iterator();
-                    }
-
-                    while (commState.it.hasNext() || commState.cur != NULL) {
-                        if (commState.cur == NULL)
-                            commState.cur = commState.it.next();
-
-                        if 
(!commState.putCacheVersion((GridCacheVersion)commState.cur))
-                            return false;
-
-                        commState.cur = NULL;
-                    }
-
-                    commState.it = null;
-                } else {
-                    if (!commState.putInt(-1))
-                        return false;
-                }
-
-                commState.idx++;
-
-            case 27:
-                if (!commState.putBoolean(sysInvalidate))
-                    return false;
-
-                commState.idx++;
-
-            case 28:
-                if (!commState.putLong(topVer))
-                    return false;
-
-                commState.idx++;
-
-            case 29:
-                if (!commState.putCacheVersion(writeVer))
-                    return false;
-
-                commState.idx++;
-
-            case 30:
-                if (!commState.putUuid(subjId))
-                    return false;
-
-                commState.idx++;
-
-            case 31:
-                if (!commState.putInt(taskNameHash))
-                    return false;
-
-                commState.idx++;
-
-            case 32:
-                if (!commState.putLongList(ttls))
-                    return false;
-
-                commState.idx++;
-
-            case 33:
-                if (!commState.putLongList(nearTtls))
-                    return false;
-
-                commState.idx++;
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("all")
-    @Override public boolean readFrom(ByteBuffer buf) {
-        commState.setBuffer(buf);
-
-        if (!super.readFrom(buf))
-            return false;
-
-        switch (commState.idx) {
-            case 21:
-                if (buf.remaining() < 1)
-                    return false;
-
-                byte isolation0 = commState.getByte();
-
-                isolation = IgniteTxIsolation.fromOrdinal(isolation0);
-
-                commState.idx++;
-
-            case 22:
-                IgniteUuid miniId0 = commState.getGridUuid();
-
-                if (miniId0 == GRID_UUID_NOT_READ)
-                    return false;
-
-                miniId = miniId0;
-
-                commState.idx++;
-
-            case 23:
-                UUID nearNodeId0 = commState.getUuid();
-
-                if (nearNodeId0 == UUID_NOT_READ)
-                    return false;
-
-                nearNodeId = nearNodeId0;
-
-                commState.idx++;
-
-            case 24:
-                if (commState.readSize == -1) {
-                    if (buf.remaining() < 4)
-                        return false;
-
-                    commState.readSize = commState.getInt();
-                }
-
-                if (commState.readSize >= 0) {
-                    if (nearWritesBytes == null)
-                        nearWritesBytes = new ArrayList<>(commState.readSize);
-
-                    for (int i = commState.readItems; i < commState.readSize; 
i++) {
-                        byte[] _val = commState.getByteArray();
-
-                        if (_val == BYTE_ARR_NOT_READ)
-                            return false;
-
-                        nearWritesBytes.add((byte[])_val);
-
-                        commState.readItems++;
-                    }
-                }
-
-                commState.readSize = -1;
-                commState.readItems = 0;
-
-                commState.idx++;
-
-            case 25:
-                if (buf.remaining() < 1)
-                    return false;
-
-                onePhaseCommit = commState.getBoolean();
-
-                commState.idx++;
-
-            case 26:
-                if (commState.readSize == -1) {
-                    if (buf.remaining() < 4)
-                        return false;
-
-                    commState.readSize = commState.getInt();
-                }
-
-                if (commState.readSize >= 0) {
-                    if (pendingVers == null)
-                        pendingVers = new ArrayList<>(commState.readSize);
-
-                    for (int i = commState.readItems; i < commState.readSize; 
i++) {
-                        GridCacheVersion _val = commState.getCacheVersion();
-
-                        if (_val == CACHE_VER_NOT_READ)
-                            return false;
-
-                        pendingVers.add((GridCacheVersion)_val);
-
-                        commState.readItems++;
-                    }
-                }
-
-                commState.readSize = -1;
-                commState.readItems = 0;
-
-                commState.idx++;
-
-            case 27:
-                if (buf.remaining() < 1)
-                    return false;
-
-                sysInvalidate = commState.getBoolean();
-
-                commState.idx++;
-
-            case 28:
-                if (buf.remaining() < 8)
-                    return false;
-
-                topVer = commState.getLong();
-
-                commState.idx++;
-
-            case 29:
-                GridCacheVersion writeVer0 = commState.getCacheVersion();
-
-                if (writeVer0 == CACHE_VER_NOT_READ)
-                    return false;
-
-                writeVer = writeVer0;
-
-                commState.idx++;
-
-            case 30:
-                UUID subjId0 = commState.getUuid();
-
-                if (subjId0 == UUID_NOT_READ)
-                    return false;
-
-                subjId = subjId0;
-
-                commState.idx++;
-
-            case 31:
-                if (buf.remaining() < 4)
-                    return false;
-
-                taskNameHash = commState.getInt();
-
-                commState.idx++;
-
-            case 32:
-                GridLongList ttls0 = commState.getLongList();
-
-                if (ttls0 == LONG_LIST_NOT_READ)
-                    return false;
-
-                ttls = ttls0;
-
-                commState.idx++;
-
-            case 33:
-                GridLongList nearTtls0 = commState.getLongList();
-
-                if (nearTtls0 == LONG_LIST_NOT_READ)
-                    return false;
-
-                nearTtls = nearTtls0;
-
-                commState.idx++;
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public byte directType() {
-        return 31;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
deleted file mode 100644
index a086789..0000000
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.gridgain.grid.kernal.processors.cache.distributed.dht;
-
-import org.apache.ignite.lang.*;
-import org.gridgain.grid.kernal.processors.cache.*;
-import org.gridgain.grid.kernal.processors.cache.distributed.*;
-import org.apache.ignite.internal.util.direct.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-
-import java.io.*;
-import java.nio.*;
-
-/**
- * DHT transaction finish response.
- */
-public class GridDhtTxFinishResponse<K, V> extends 
GridDistributedTxFinishResponse<K, V> {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Mini future ID. */
-    private IgniteUuid miniId;
-
-    /**
-     * Empty constructor required by {@link Externalizable}.
-     */
-    public GridDhtTxFinishResponse() {
-        // No-op.
-    }
-
-    /**
-     * @param xid Xid version.
-     * @param futId Future ID.
-     * @param miniId Mini future ID.
-     */
-    public GridDhtTxFinishResponse(GridCacheVersion xid, IgniteUuid futId, 
IgniteUuid miniId) {
-        super(xid, futId);
-
-        assert miniId != null;
-
-        this.miniId = miniId;
-    }
-
-    /**
-     * @return Mini future ID.
-     */
-    public IgniteUuid miniId() {
-        return miniId;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridDhtTxFinishResponse.class, this, 
super.toString());
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"})
-    @Override public GridTcpCommunicationMessageAdapter clone() {
-        GridDhtTxFinishResponse _clone = new GridDhtTxFinishResponse();
-
-        clone0(_clone);
-
-        return _clone;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
-        super.clone0(_msg);
-
-        GridDhtTxFinishResponse _clone = (GridDhtTxFinishResponse)_msg;
-
-        _clone.miniId = miniId;
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("all")
-    @Override public boolean writeTo(ByteBuffer buf) {
-        commState.setBuffer(buf);
-
-        if (!super.writeTo(buf))
-            return false;
-
-        if (!commState.typeWritten) {
-            if (!commState.putByte(directType()))
-                return false;
-
-            commState.typeWritten = true;
-        }
-
-        switch (commState.idx) {
-            case 5:
-                if (!commState.putGridUuid(miniId))
-                    return false;
-
-                commState.idx++;
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("all")
-    @Override public boolean readFrom(ByteBuffer buf) {
-        commState.setBuffer(buf);
-
-        if (!super.readFrom(buf))
-            return false;
-
-        switch (commState.idx) {
-            case 5:
-                IgniteUuid miniId0 = commState.getGridUuid();
-
-                if (miniId0 == GRID_UUID_NOT_READ)
-                    return false;
-
-                miniId = miniId0;
-
-                commState.idx++;
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public byte directType() {
-        return 32;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocal.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocal.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocal.java
deleted file mode 100644
index 5036071..0000000
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocal.java
+++ /dev/null
@@ -1,656 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.gridgain.grid.kernal.processors.cache.distributed.dht;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.transactions.*;
-import org.gridgain.grid.kernal.processors.cache.*;
-import org.gridgain.grid.kernal.processors.cache.distributed.*;
-import org.apache.ignite.internal.processors.cache.distributed.near.*;
-import org.apache.ignite.internal.processors.cache.transactions.*;
-import org.apache.ignite.internal.util.tostring.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.util.*;
-import java.util.concurrent.atomic.*;
-
-import static org.apache.ignite.transactions.IgniteTxState.*;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
-import static org.gridgain.grid.kernal.processors.cache.GridCacheUtils.*;
-
-/**
- * Replicated user transaction.
- */
-public class GridDhtTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> 
implements GridCacheMappedVersion {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** */
-    private UUID nearNodeId;
-
-    /** Near future ID. */
-    private IgniteUuid nearFutId;
-
-    /** Near future ID. */
-    private IgniteUuid nearMiniId;
-
-    /** Near future ID. */
-    private IgniteUuid nearFinFutId;
-
-    /** Near future ID. */
-    private IgniteUuid nearFinMiniId;
-
-    /** Near XID. */
-    private GridCacheVersion nearXidVer;
-
-    /** Transaction nodes mapping (primary node -> related backup nodes). */
-    private Map<UUID, Collection<UUID>> txNodes;
-
-    /** Future. */
-    @GridToStringExclude
-    private final AtomicReference<GridDhtTxPrepareFuture<K, V>> prepFut =
-        new AtomicReference<>();
-
-    /**
-     * Empty constructor required for {@link Externalizable}.
-     */
-    public GridDhtTxLocal() {
-        // No-op.
-    }
-
-    /**
-     * @param nearNodeId Near node ID that initiated transaction.
-     * @param nearXidVer Near transaction ID.
-     * @param nearFutId Near future ID.
-     * @param nearMiniId Near mini future ID.
-     * @param nearThreadId Near thread ID.
-     * @param implicit Implicit flag.
-     * @param implicitSingle Implicit-with-single-key flag.
-     * @param cctx Cache context.
-     * @param concurrency Concurrency.
-     * @param isolation Isolation.
-     * @param timeout Timeout.
-     * @param storeEnabled Store enabled flag.
-     * @param txSize Expected transaction size.
-     * @param grpLockKey Group lock key if this is a group-lock transaction.
-     * @param partLock {@code True} if this is a group-lock transaction and 
whole partition should be locked.
-     * @param txNodes Transaction nodes mapping.
-     */
-    public GridDhtTxLocal(
-        GridCacheSharedContext<K, V> cctx,
-        UUID nearNodeId,
-        GridCacheVersion nearXidVer,
-        IgniteUuid nearFutId,
-        IgniteUuid nearMiniId,
-        long nearThreadId,
-        boolean implicit,
-        boolean implicitSingle,
-        boolean sys,
-        IgniteTxConcurrency concurrency,
-        IgniteTxIsolation isolation,
-        long timeout,
-        boolean invalidate,
-        boolean storeEnabled,
-        int txSize,
-        @Nullable IgniteTxKey grpLockKey,
-        boolean partLock,
-        Map<UUID, Collection<UUID>> txNodes,
-        UUID subjId,
-        int taskNameHash
-    ) {
-        super(
-            cctx,
-            cctx.versions().onReceivedAndNext(nearNodeId, nearXidVer),
-            implicit,
-            implicitSingle,
-            sys,
-            concurrency,
-            isolation,
-            timeout,
-            invalidate,
-            storeEnabled,
-            txSize,
-            grpLockKey,
-            partLock,
-            subjId,
-            taskNameHash);
-
-        assert cctx != null;
-        assert nearNodeId != null;
-        assert nearFutId != null;
-        assert nearMiniId != null;
-        assert nearXidVer != null;
-
-        this.nearNodeId = nearNodeId;
-        this.nearXidVer = nearXidVer;
-        this.nearFutId = nearFutId;
-        this.nearMiniId = nearMiniId;
-        this.txNodes = txNodes;
-
-        threadId = nearThreadId;
-
-        assert !F.eq(xidVer, nearXidVer);
-    }
-
-    /** {@inheritDoc} */
-    @Override public Map<UUID, Collection<UUID>> transactionNodes() {
-        return txNodes;
-    }
-
-    /** {@inheritDoc} */
-    @Override public UUID eventNodeId() {
-        return nearNodeId;
-    }
-
-    /** {@inheritDoc} */
-    @Override public Collection<UUID> masterNodeIds() {
-        assert nearNodeId != null;
-
-        return Collections.singleton(nearNodeId);
-    }
-
-    /** {@inheritDoc} */
-    @Override public UUID otherNodeId() {
-        assert nearNodeId != null;
-
-        return nearNodeId;
-    }
-
-    /** {@inheritDoc} */
-    @Override public UUID originatingNodeId() {
-        return nearNodeId;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected UUID nearNodeId() {
-        return nearNodeId;
-    }
-
-    /** {@inheritDoc} */
-    @Override public GridCacheVersion nearXidVersion() {
-        return nearXidVer;
-    }
-
-    /** {@inheritDoc} */
-    @Override public GridCacheVersion mappedVersion() {
-        return nearXidVer;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected IgniteUuid nearFutureId() {
-        return nearFutId;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected IgniteUuid nearMiniId() {
-        return nearMiniId;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean dht() {
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected boolean updateNearCache(GridCacheContext<K, V> 
cacheCtx, K key, long topVer) {
-        return cacheCtx.isDht() && isNearEnabled(cacheCtx) && 
!cctx.localNodeId().equals(nearNodeId());
-    }
-
-    /**
-     * @return Near future ID.
-     */
-    public IgniteUuid nearFinishFutureId() {
-        return nearFinFutId;
-    }
-
-    /**
-     * @param nearFinFutId Near future ID.
-     */
-    public void nearFinishFutureId(IgniteUuid nearFinFutId) {
-        this.nearFinFutId = nearFinFutId;
-    }
-
-    /**
-     * @return Near future mini ID.
-     */
-    public IgniteUuid nearFinishMiniId() {
-        return nearFinMiniId;
-    }
-
-    /**
-     * @param nearFinMiniId Near future mini ID.
-     */
-    public void nearFinishMiniId(IgniteUuid nearFinMiniId) {
-        this.nearFinMiniId = nearFinMiniId;
-    }
-
-    /** {@inheritDoc} */
-    @Override @Nullable protected IgniteFuture<Boolean> addReader(long msgId, 
GridDhtCacheEntry<K, V> cached,
-        IgniteTxEntry<K, V> entry, long topVer) {
-        // Don't add local node as reader.
-        if (!cctx.localNodeId().equals(nearNodeId)) {
-            GridCacheContext<K, V> cacheCtx = cached.context();
-
-            while (true) {
-                try {
-                    return cached.addReader(nearNodeId, msgId, topVer);
-                }
-                catch (GridCacheEntryRemovedException ignore) {
-                    if (log.isDebugEnabled())
-                        log.debug("Got removed entry when adding to DHT local 
transaction: " + cached);
-
-                    cached = cacheCtx.dht().entryExx(entry.key(), topVer);
-                }
-            }
-        }
-
-        return null;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void updateExplicitVersion(IgniteTxEntry<K, V> 
txEntry, GridCacheEntryEx<K, V> entry)
-        throws GridCacheEntryRemovedException {
-        // DHT local transactions don't have explicit locks.
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteFuture<IgniteTxEx<K, V>> prepareAsync() {
-        if (optimistic()) {
-            assert isSystemInvalidate();
-
-            return prepareAsync(null, null, Collections.<IgniteTxKey<K>, 
GridCacheVersion>emptyMap(), 0, nearMiniId, null, true,
-                null);
-        }
-
-        // For pessimistic mode we don't distribute prepare request.
-        GridDhtTxPrepareFuture<K, V> fut = prepFut.get();
-
-        if (fut == null) {
-            // Future must be created before any exception can be thrown.
-            if (!prepFut.compareAndSet(null, fut = new 
GridDhtTxPrepareFuture<>(cctx, this, nearMiniId,
-                Collections.<IgniteTxKey<K>, GridCacheVersion>emptyMap(), 
true, null)))
-                return prepFut.get();
-        }
-        else
-            // Prepare was called explicitly.
-            return fut;
-
-        if (!state(PREPARING)) {
-            if (setRollbackOnly()) {
-                if (timedOut())
-                    fut.onError(new IgniteTxTimeoutException("Transaction 
timed out and was rolled back: " + this));
-                else
-                    fut.onError(new IgniteCheckedException("Invalid 
transaction state for prepare [state=" + state() +
-                        ", tx=" + this + ']'));
-            }
-            else
-                fut.onError(new IgniteTxRollbackException("Invalid transaction 
state for prepare [state=" + state()
-                    + ", tx=" + this + ']'));
-
-            return fut;
-        }
-
-        try {
-            userPrepare();
-
-            if (!state(PREPARED)) {
-                setRollbackOnly();
-
-                fut.onError(new IgniteCheckedException("Invalid transaction 
state for commit [state=" + state() +
-                    ", tx=" + this + ']'));
-
-                return fut;
-            }
-
-            fut.complete();
-
-            return fut;
-        }
-        catch (IgniteCheckedException e) {
-            fut.onError(e);
-
-            return fut;
-        }
-    }
-
-    /**
-     * Prepares next batch of entries in dht transaction.
-     *
-     * @param reads Read entries.
-     * @param writes Write entries.
-     * @param verMap Version map.
-     * @param msgId Message ID.
-     * @param nearMiniId Near mini future ID.
-     * @param txNodes Transaction nodes mapping.
-     * @param last {@code True} if this is last prepare request.
-     * @param lastBackups IDs of backup nodes receiving last prepare request.
-     * @return Future that will be completed when locks are acquired.
-     */
-    public IgniteFuture<IgniteTxEx<K, V>> prepareAsync(@Nullable 
Iterable<IgniteTxEntry<K, V>> reads,
-        @Nullable Iterable<IgniteTxEntry<K, V>> writes,
-        Map<IgniteTxKey<K>, GridCacheVersion> verMap,
-        long msgId,
-        IgniteUuid nearMiniId,
-        Map<UUID, Collection<UUID>> txNodes,
-        boolean last,
-        Collection<UUID> lastBackups) {
-        assert optimistic();
-
-        // In optimistic mode prepare still can be called explicitly from 
salvageTx.
-        GridDhtTxPrepareFuture<K, V> fut = prepFut.get();
-
-        if (fut == null) {
-            init();
-
-            // Future must be created before any exception can be thrown.
-            if (!prepFut.compareAndSet(null, fut = new 
GridDhtTxPrepareFuture<>(cctx, this, nearMiniId, verMap, last,
-                lastBackups))) {
-                GridDhtTxPrepareFuture<K, V> f = prepFut.get();
-
-                assert f.nearMiniId().equals(nearMiniId) : "Wrong near mini id 
on existing future " +
-                    "[futMiniId=" + f.nearMiniId() + ", miniId=" + nearMiniId 
+ ", fut=" + f + ']';
-
-                return f;
-            }
-        }
-        else {
-            assert fut.nearMiniId().equals(nearMiniId) : "Wrong near mini id 
on existing future " +
-                "[futMiniId=" + fut.nearMiniId() + ", miniId=" + nearMiniId + 
", fut=" + fut + ']';
-
-            // Prepare was called explicitly.
-            return fut;
-        }
-
-        if (state() != PREPARING) {
-            if (!state(PREPARING)) {
-                if (state() == PREPARED && isSystemInvalidate())
-                    fut.complete();
-                if (setRollbackOnly()) {
-                    if (timedOut())
-                        fut.onError(new IgniteTxTimeoutException("Transaction 
timed out and was rolled back: " +
-                            this));
-                    else
-                        fut.onError(new IgniteCheckedException("Invalid 
transaction state for prepare [state=" + state() +
-                            ", tx=" + this + ']'));
-                }
-                else
-                    fut.onError(new IgniteTxRollbackException("Invalid 
transaction state for prepare [state=" +
-                        state() + ", tx=" + this + ']'));
-
-                return fut;
-            }
-        }
-
-        try {
-            if (reads != null)
-                for (IgniteTxEntry<K, V> e : reads)
-                    addEntry(msgId, e);
-
-            if (writes != null)
-                for (IgniteTxEntry<K, V> e : writes)
-                    addEntry(msgId, e);
-
-            userPrepare();
-
-            // Make sure to add future before calling prepare on it.
-            cctx.mvcc().addFuture(fut);
-
-            if (isSystemInvalidate())
-                fut.complete();
-            else
-                fut.prepare(reads, writes, txNodes);
-        }
-        catch (IgniteTxTimeoutException | IgniteTxOptimisticException e) {
-            fut.onError(e);
-        }
-        catch (IgniteCheckedException e) {
-            setRollbackOnly();
-
-            fut.onError(new IgniteTxRollbackException("Failed to prepare 
transaction: " + this, e));
-
-            try {
-                rollback();
-            }
-            catch (IgniteTxOptimisticException e1) {
-                if (log.isDebugEnabled())
-                    log.debug("Failed optimistically to prepare transaction 
[tx=" + this + ", e=" + e1 + ']');
-
-                fut.onError(e);
-            }
-            catch (IgniteCheckedException e1) {
-                U.error(log, "Failed to rollback transaction: " + this, e1);
-            }
-        }
-
-        return fut;
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings({"ThrowableInstanceNeverThrown"})
-    @Override public IgniteFuture<IgniteTx> commitAsync() {
-        if (log.isDebugEnabled())
-            log.debug("Committing dht local tx: " + this);
-
-        // In optimistic mode prepare was called explicitly.
-        if (pessimistic())
-            prepareAsync();
-
-        final GridDhtTxFinishFuture<K, V> fut = new 
GridDhtTxFinishFuture<>(cctx, this, /*commit*/true);
-
-        cctx.mvcc().addFuture(fut);
-
-        GridDhtTxPrepareFuture<K, V> prep = prepFut.get();
-
-        if (prep != null) {
-            if (prep.isDone()) {
-                try {
-                    prep.get(); // Check for errors of a parent future.
-
-                    if (finish(true))
-                        fut.finish();
-                    else
-                        fut.onError(new IgniteCheckedException("Failed to 
commit transaction: " + CU.txString(this)));
-                }
-                catch (IgniteTxOptimisticException e) {
-                    if (log.isDebugEnabled())
-                        log.debug("Failed to optimistically prepare 
transaction [tx=" + this + ", e=" + e + ']');
-
-                    fut.onError(e);
-                }
-                catch (IgniteCheckedException e) {
-                    U.error(log, "Failed to prepare transaction: " + this, e);
-
-                    fut.onError(e);
-                }
-            }
-            else
-                prep.listenAsync(new CI1<IgniteFuture<IgniteTxEx<K, V>>>() {
-                    @Override public void apply(IgniteFuture<IgniteTxEx<K, V>> 
f) {
-                        try {
-                            f.get(); // Check for errors of a parent future.
-
-                            if (finish(true))
-                                fut.finish();
-                            else
-                                fut.onError(new IgniteCheckedException("Failed 
to commit transaction: " +
-                                    CU.txString(GridDhtTxLocal.this)));
-                        }
-                        catch (IgniteTxOptimisticException e) {
-                            if (log.isDebugEnabled())
-                                log.debug("Failed optimistically to prepare 
transaction [tx=" + this + ", e=" + e + ']');
-
-                            fut.onError(e);
-                        }
-                        catch (IgniteCheckedException e) {
-                            U.error(log, "Failed to prepare transaction: " + 
this, e);
-
-                            fut.onError(e);
-                        }
-                    }
-                });
-        }
-        else {
-            assert optimistic();
-
-            try {
-                if (finish(true))
-                    fut.finish();
-                else
-                    fut.onError(new IgniteCheckedException("Failed to commit 
transaction: " + CU.txString(this)));
-            }
-            catch (IgniteTxOptimisticException e) {
-                if (log.isDebugEnabled())
-                    log.debug("Failed optimistically to prepare transaction 
[tx=" + this + ", e=" + e + ']');
-
-                fut.onError(e);
-            }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Failed to commit transaction: " + this, e);
-
-                fut.onError(e);
-            }
-        }
-
-        return fut;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void clearPrepareFuture(GridDhtTxPrepareFuture<K, V> 
fut) {
-        assert optimistic();
-
-        prepFut.compareAndSet(fut, null);
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteFuture<IgniteTx> rollbackAsync() {
-        GridDhtTxPrepareFuture<K, V> prepFut = this.prepFut.get();
-
-        final GridDhtTxFinishFuture<K, V> fut = new 
GridDhtTxFinishFuture<>(cctx, this, /*rollback*/false);
-
-        cctx.mvcc().addFuture(fut);
-
-        if (prepFut == null) {
-            try {
-                if (finish(false) || state() == UNKNOWN)
-                    fut.finish();
-                else
-                    fut.onError(new IgniteCheckedException("Failed to commit 
transaction: " + CU.txString(this)));
-            }
-            catch (IgniteTxOptimisticException e) {
-                if (log.isDebugEnabled())
-                    log.debug("Failed optimistically to prepare transaction 
[tx=" + this + ", e=" + e + ']');
-
-                fut.onError(e);
-            }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Failed to rollback transaction (will make the 
best effort to rollback remote nodes): " +
-                    this, e);
-
-                fut.onError(e);
-            }
-        }
-        else {
-            prepFut.complete();
-
-            prepFut.listenAsync(new CI1<IgniteFuture<IgniteTxEx<K, V>>>() {
-                @Override public void apply(IgniteFuture<IgniteTxEx<K, V>> f) {
-                    try {
-                        f.get(); // Check for errors of a parent future.
-                    }
-                    catch (IgniteCheckedException e) {
-                        if (log.isDebugEnabled())
-                            log.debug("Failed to prepare or rollback 
transaction [tx=" + this + ", e=" + e + ']');
-                    }
-
-                    try {
-                        if (finish(false) || state() == UNKNOWN)
-                            fut.finish();
-                        else
-                            fut.onError(new IgniteCheckedException("Failed to 
commit transaction: " +
-                                CU.txString(GridDhtTxLocal.this)));
-
-                    }
-                    catch (IgniteCheckedException e) {
-                        U.error(log, "Failed to gracefully rollback 
transaction: " + CU.txString(GridDhtTxLocal.this),
-                            e);
-
-                        fut.onError(e);
-                    }
-                }
-            });
-        }
-
-        return fut;
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings({"CatchGenericClass", "ThrowableInstanceNeverThrown"})
-    @Override public boolean finish(boolean commit) throws 
IgniteCheckedException {
-        assert nearFinFutId != null || isInvalidate() || !commit || 
isSystemInvalidate()
-            || onePhaseCommit() || state() == PREPARED :
-            "Invalid state [nearFinFutId=" + nearFinFutId + ", isInvalidate=" 
+ isInvalidate() + ", commit=" + commit +
-            ", sysInvalidate=" + isSystemInvalidate() + ", state=" + state() + 
']';
-
-        assert nearMiniId != null;
-
-        return super.finish(commit);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void sendFinishReply(boolean commit, @Nullable 
Throwable err) {
-        if (nearFinFutId != null) {
-            if (nearNodeId.equals(cctx.localNodeId())) {
-                if (log.isDebugEnabled())
-                    log.debug("Skipping response sending to local node: " + 
this);
-
-                return;
-            }
-
-            GridNearTxFinishResponse<K, V> res = new 
GridNearTxFinishResponse<>(nearXidVer, threadId, nearFinFutId,
-                nearFinMiniId, err);
-
-            try {
-                cctx.io().send(nearNodeId, res, system() ? UTILITY_CACHE_POOL 
: SYSTEM_POOL);
-            }
-            catch (ClusterTopologyException ignored) {
-                if (log.isDebugEnabled())
-                    log.debug("Node left before sending finish response 
(transaction was committed) [node=" +
-                        nearNodeId + ", res=" + res + ']');
-            }
-            catch (Throwable ex) {
-                U.error(log, "Failed to send finish response to node 
(transaction was " +
-                    (commit ? "committed" : "rolledback") + ") [node=" + 
nearNodeId + ", res=" + res + ']', ex);
-            }
-        }
-        else {
-            if (log.isDebugEnabled())
-                log.debug("Will not send finish reply because sender node has 
not sent finish request yet: " + this);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return GridToStringBuilder.toString(GridDhtTxLocal.class, this, 
"super", super.toString());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
deleted file mode 100644
index dd8815b..0000000
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ /dev/null
@@ -1,831 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.gridgain.grid.kernal.processors.cache.distributed.dht;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.internal.util.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.transactions.*;
-import org.gridgain.grid.kernal.processors.cache.*;
-import org.gridgain.grid.kernal.processors.cache.distributed.*;
-import org.apache.ignite.internal.processors.cache.transactions.*;
-import org.apache.ignite.internal.util.future.*;
-import org.apache.ignite.internal.util.tostring.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.jdk8.backport.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.util.*;
-import java.util.concurrent.atomic.*;
-
-import static org.apache.ignite.transactions.IgniteTxState.*;
-import static org.gridgain.grid.kernal.processors.cache.GridCacheOperation.*;
-
-/**
- * Replicated user transaction.
- */
-public abstract class GridDhtTxLocalAdapter<K, V> extends 
IgniteTxLocalAdapter<K, V> {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Near mappings. */
-    protected Map<UUID, GridDistributedTxMapping<K, V>> nearMap =
-        new ConcurrentHashMap8<>();
-
-    /** DHT mappings. */
-    protected Map<UUID, GridDistributedTxMapping<K, V>> dhtMap =
-        new ConcurrentHashMap8<>();
-
-    /** Mapped flag. */
-    private AtomicBoolean mapped = new AtomicBoolean();
-
-    /** */
-    private long dhtThreadId;
-
-    /** */
-    private boolean needsCompletedVers;
-
-    /** Versions of pending locks for entries of this tx. */
-    private Collection<GridCacheVersion> pendingVers;
-
-    /**
-     * Empty constructor required for {@link Externalizable}.
-     */
-    protected GridDhtTxLocalAdapter() {
-        // No-op.
-    }
-
-    /**
-     * @param xidVer Transaction version.
-     * @param implicit Implicit flag.
-     * @param implicitSingle Implicit-with-single-key flag.
-     * @param cctx Cache context.
-     * @param sys System flag.
-     * @param concurrency Concurrency.
-     * @param isolation Isolation.
-     * @param timeout Timeout.
-     * @param txSize Expected transaction size.
-     * @param grpLockKey Group lock key if this is a group-lock transaction.
-     * @param partLock If this is a group-lock transaction and the whole 
partition should be locked.
-     */
-    protected GridDhtTxLocalAdapter(
-        GridCacheSharedContext<K, V> cctx,
-        GridCacheVersion xidVer,
-        boolean implicit,
-        boolean implicitSingle,
-        boolean sys,
-        IgniteTxConcurrency concurrency,
-        IgniteTxIsolation isolation,
-        long timeout,
-        boolean invalidate,
-        boolean storeEnabled,
-        int txSize,
-        @Nullable IgniteTxKey grpLockKey,
-        boolean partLock,
-        @Nullable UUID subjId,
-        int taskNameHash
-    ) {
-        super(cctx, xidVer, implicit, implicitSingle, sys, concurrency, 
isolation, timeout, invalidate, storeEnabled,
-            txSize, grpLockKey, partLock, subjId, taskNameHash);
-
-        assert cctx != null;
-
-        threadId = Thread.currentThread().getId();
-        dhtThreadId = threadId;
-    }
-
-    /**
-     * @return Near node id.
-     */
-    protected abstract UUID nearNodeId();
-
-    /**
-     * @return Near future ID.
-     */
-    protected abstract IgniteUuid nearFutureId();
-
-    /**
-     * @return Near future mini ID.
-     */
-    protected abstract IgniteUuid nearMiniId();
-
-    /**
-     * Adds reader to cached entry.
-     *
-     * @param msgId Message ID.
-     * @param cached Cached entry.
-     * @param entry Transaction entry.
-     * @param topVer Topology version.
-     * @return {@code True} if reader was added as a result of this call.
-     */
-    @Nullable protected abstract IgniteFuture<Boolean> addReader(long msgId,
-        GridDhtCacheEntry<K, V> cached,
-        IgniteTxEntry<K, V> entry,
-        long topVer);
-
-    /**
-     * @param commit Commit flag.
-     * @param err Error, if any.
-     */
-    protected abstract void sendFinishReply(boolean commit, @Nullable 
Throwable err);
-
-    /**
-     * @param needsCompletedVers {@code True} if needs completed versions.
-     */
-    public void needsCompletedVersions(boolean needsCompletedVers) {
-        this.needsCompletedVers |= needsCompletedVers;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean needsCompletedVersions() {
-        return needsCompletedVers;
-    }
-
-    /**
-     * @return Versions for all pending locks that were in queue before tx 
locks were released.
-     */
-    public Collection<GridCacheVersion> pendingVersions() {
-        return pendingVers == null ? Collections.<GridCacheVersion>emptyList() 
: pendingVers;
-    }
-
-    /**
-     * @param pendingVers Versions for all pending locks that were in queue 
before tx locsk were released.
-     */
-    public void pendingVersions(Collection<GridCacheVersion> pendingVers) {
-        this.pendingVers = pendingVers;
-    }
-
-    /**
-     * @return DHT thread ID.
-     */
-    long dhtThreadId() {
-        return dhtThreadId;
-    }
-
-    /**
-     * Map explicit locks.
-     */
-    protected void mapExplicitLocks() {
-        if (!mapped.get()) {
-            // Explicit locks may participate in implicit transactions only.
-            if (!implicit()) {
-                mapped.set(true);
-
-                return;
-            }
-
-            Map<ClusterNode, List<GridDhtCacheEntry<K, V>>> dhtEntryMap = null;
-            Map<ClusterNode, List<GridDhtCacheEntry<K, V>>> nearEntryMap = 
null;
-
-            for (IgniteTxEntry<K, V> e : allEntries()) {
-                assert e.cached() != null;
-
-                GridCacheContext<K, V> cacheCtx = e.cached().context();
-
-                if (cacheCtx.isNear())
-                    continue;
-
-                if (e.cached().obsolete()) {
-                    GridCacheEntryEx<K, V> cached = 
cacheCtx.cache().entryEx(e.key());
-
-                    e.cached(cached, cached.keyBytes());
-                }
-
-                if (e.cached().detached() || e.cached().isLocal())
-                    continue;
-
-                while (true) {
-                    try {
-                        // Map explicit locks.
-                        if (e.explicitVersion() != null && 
!e.explicitVersion().equals(xidVer)) {
-                            if (dhtEntryMap == null)
-                                dhtEntryMap = new GridLeanMap<>();
-
-                            if (nearEntryMap == null)
-                                nearEntryMap = new GridLeanMap<>();
-
-                            cacheCtx.dhtMap(nearNodeId(), topologyVersion(),
-                                (GridDhtCacheEntry<K, V>)e.cached(), log, 
dhtEntryMap, nearEntryMap);
-                        }
-
-                        break;
-                    }
-                    catch (GridCacheEntryRemovedException ignore) {
-                        GridCacheEntryEx<K, V> cached = 
cacheCtx.cache().entryEx(e.key());
-
-                        e.cached(cached, cached.keyBytes());
-                    }
-                }
-            }
-
-            if (!F.isEmpty(dhtEntryMap))
-                addDhtMapping(dhtEntryMap);
-
-            if (!F.isEmpty(nearEntryMap))
-                addNearMapping(nearEntryMap);
-
-            mapped.set(true);
-        }
-    }
-
-    /**
-     * @return DHT map.
-     */
-    Map<UUID, GridDistributedTxMapping<K, V>> dhtMap() {
-        mapExplicitLocks();
-
-        return dhtMap;
-    }
-
-    /**
-     * @return Near map.
-     */
-    Map<UUID, GridDistributedTxMapping<K, V>> nearMap() {
-        mapExplicitLocks();
-
-        return nearMap;
-    }
-
-    /**
-     * @param nodeId Node ID.
-     * @return Mapping.
-     */
-    GridDistributedTxMapping<K, V> dhtMapping(UUID nodeId) {
-        return dhtMap.get(nodeId);
-    }
-
-    /**
-     * @param nodeId Node ID.
-     * @return Mapping.
-     */
-    GridDistributedTxMapping<K, V> nearMapping(UUID nodeId) {
-        return nearMap.get(nodeId);
-    }
-
-    /**
-     * @param mappings Mappings to add.
-     */
-    void addDhtMapping(Map<ClusterNode, List<GridDhtCacheEntry<K, V>>> 
mappings) {
-        addMapping(mappings, dhtMap);
-    }
-
-    /**
-     * @param mappings Mappings to add.
-     */
-    void addNearMapping(Map<ClusterNode, List<GridDhtCacheEntry<K, V>>> 
mappings) {
-        addMapping(mappings, nearMap);
-    }
-
-    /**
-     * @param nodeId Node ID.
-     * @return {@code True} if mapping was removed.
-     */
-    public boolean removeMapping(UUID nodeId) {
-        return removeMapping(nodeId, null, dhtMap) | removeMapping(nodeId, 
null, nearMap);
-    }
-
-    /**
-     * @param nodeId Node ID.
-     * @param entry Entry to remove.
-     * @return {@code True} if was removed.
-     */
-    boolean removeDhtMapping(UUID nodeId, GridCacheEntryEx<K, V> entry) {
-        return removeMapping(nodeId, entry, dhtMap);
-    }
-
-    /**
-     * @param nodeId Node ID.
-     * @param entry Entry to remove.
-     * @return {@code True} if was removed.
-     */
-    boolean removeNearMapping(UUID nodeId, GridCacheEntryEx<K, V> entry) {
-        return removeMapping(nodeId, entry, nearMap);
-    }
-
-    /**
-     * @param nodeId Node ID.
-     * @param entry Entry to remove.
-     * @param map Map to remove from.
-     * @return {@code True} if was removed.
-     */
-    private boolean removeMapping(UUID nodeId, @Nullable GridCacheEntryEx<K, 
V> entry,
-        Map<UUID, GridDistributedTxMapping<K, V>> map) {
-        if (entry != null) {
-            if (log.isDebugEnabled())
-                log.debug("Removing mapping for entry [nodeId=" + nodeId + ", 
entry=" + entry + ']');
-
-            IgniteTxEntry<K, V> txEntry = txMap.get(entry.txKey());
-
-            if (txEntry == null)
-                return false;
-
-            GridDistributedTxMapping<K, V> m = map.get(nodeId);
-
-            boolean ret = m != null && m.removeEntry(txEntry);
-
-            if (m != null && m.empty())
-                map.remove(nodeId);
-
-            return ret;
-        }
-        else
-            return map.remove(nodeId) != null;
-    }
-
-    /**
-     * @param mappings Entry mappings.
-     * @param map Transaction mappings.
-     */
-    private void addMapping(Map<ClusterNode, List<GridDhtCacheEntry<K, V>>> 
mappings,
-        Map<UUID, GridDistributedTxMapping<K, V>> map) {
-        for (Map.Entry<ClusterNode, List<GridDhtCacheEntry<K, V>>> mapping : 
mappings.entrySet()) {
-            ClusterNode n = mapping.getKey();
-
-            for (GridDhtCacheEntry<K, V> entry : mapping.getValue()) {
-                IgniteTxEntry<K, V> txEntry = txMap.get(entry.txKey());
-
-                if (txEntry != null) {
-                    GridDistributedTxMapping<K, V> m = map.get(n.id());
-
-                    if (m == null)
-                        map.put(n.id(), m = new GridDistributedTxMapping<>(n));
-
-                    m.add(txEntry);
-                }
-            }
-        }
-    }
-
-
-    /** {@inheritDoc} */
-    @Override public void addInvalidPartition(GridCacheContext<K, V> ctx, int 
part) {
-        assert false : "DHT transaction encountered invalid partition [part=" 
+ part + ", tx=" + this + ']';
-    }
-
-
-    /**
-     * @param msgId Message ID.
-     * @param e Entry to add.
-     * @return Future for active transactions for the time when reader was 
added.
-     * @throws IgniteCheckedException If failed.
-     */
-    @Nullable public IgniteFuture<Boolean> addEntry(long msgId, 
IgniteTxEntry<K, V> e) throws IgniteCheckedException {
-        init();
-
-        IgniteTxState state = state();
-
-        assert state == ACTIVE || (state == PREPARING && optimistic()) : 
"Invalid tx state for " +
-            "adding entry [msgId=" + msgId + ", e=" + e + ", tx=" + this + ']';
-
-        e.unmarshal(cctx, false, cctx.deploy().globalLoader());
-
-        checkInternal(e.txKey());
-
-        state = state();
-
-        assert state == ACTIVE || (state == PREPARING && optimistic()): 
"Invalid tx state for adding entry: " + e;
-
-        GridCacheContext<K, V> cacheCtx = e.context();
-
-        GridDhtCacheAdapter<K, V> dhtCache = cacheCtx.isNear() ? 
cacheCtx.near().dht() : cacheCtx.dht();
-
-        try {
-            IgniteTxEntry<K, V> entry = txMap.get(e.txKey());
-
-            if (entry != null) {
-                entry.op(e.op()); // Absolutely must set operation, as default 
is DELETE.
-                entry.value(e.value(), e.hasWriteValue(), e.hasReadValue());
-                entry.entryProcessors(e.entryProcessors());
-                entry.valueBytes(e.valueBytes());
-                entry.ttl(e.ttl());
-                entry.filters(e.filters());
-                entry.drExpireTime(e.drExpireTime());
-            }
-            else {
-                entry = e;
-
-                addActiveCache(dhtCache.context());
-
-                while (true) {
-                    GridDhtCacheEntry<K, V> cached = 
dhtCache.entryExx(entry.key(), topologyVersion());
-
-                    try {
-                        // Set key bytes to avoid serializing in future.
-                        cached.keyBytes(entry.keyBytes());
-
-                        entry.cached(cached, entry.keyBytes());
-
-                        break;
-                    }
-                    catch (GridCacheEntryRemovedException ignore) {
-                        if (log.isDebugEnabled())
-                            log.debug("Got removed entry when adding to dht tx 
(will retry): " + cached);
-                    }
-                }
-
-                GridCacheVersion explicit = entry.explicitVersion();
-
-                if (explicit != null) {
-                    GridCacheVersion dhtVer = 
cctx.mvcc().mappedVersion(explicit);
-
-                    if (dhtVer == null)
-                        throw new IgniteCheckedException("Failed to find dht 
mapping for explicit entry version: " + entry);
-
-                    entry.explicitVersion(dhtVer);
-                }
-
-                txMap.put(entry.txKey(), entry);
-
-                if (log.isDebugEnabled())
-                    log.debug("Added entry to transaction: " + entry);
-            }
-
-            return addReader(msgId, dhtCache.entryExx(entry.key()), entry, 
topologyVersion());
-        }
-        catch (GridDhtInvalidPartitionException ex) {
-            addInvalidPartition(cacheCtx, ex.partition());
-
-            return new GridFinishedFuture<>(cctx.kernalContext(), true);
-        }
-    }
-
-    /**
-     * @param cacheCtx Cache context.
-     * @param entries Entries to lock.
-     * @param writeEntries Write entries for implicit transactions mapped to 
one node.
-     * @param onePhaseCommit One phase commit flag.
-     * @param drVers DR versions.
-     * @param msgId Message ID.
-     * @param implicit Implicit flag.
-     * @param read Read flag.
-     * @param accessTtl TTL for read operation.
-     * @return Lock future.
-     */
-    IgniteFuture<GridCacheReturn<V>> lockAllAsync(
-        GridCacheContext<K, V> cacheCtx,
-        Collection<GridCacheEntryEx<K, V>> entries,
-        List<IgniteTxEntry<K, V>> writeEntries,
-        boolean onePhaseCommit,
-        GridCacheVersion[] drVers,
-        long msgId,
-        boolean implicit,
-        final boolean read,
-        long accessTtl
-    ) {
-        try {
-            checkValid();
-        }
-        catch (IgniteCheckedException e) {
-            return new GridFinishedFuture<>(cctx.kernalContext(), e);
-        }
-
-        final GridCacheReturn<V> ret = new GridCacheReturn<>(false);
-
-        if (F.isEmpty(entries))
-            return new GridFinishedFuture<>(cctx.kernalContext(), ret);
-
-        init();
-
-        onePhaseCommit(onePhaseCommit);
-
-        try {
-            assert drVers == null || entries.size() == drVers.length;
-
-            Set<K> skipped = null;
-
-            int idx = 0;
-            int drVerIdx = 0;
-
-            long topVer = topologyVersion();
-
-            GridDhtCacheAdapter<K, V> dhtCache = cacheCtx.isNear() ? 
cacheCtx.near().dht() : cacheCtx.dht();
-
-            // Enlist locks into transaction.
-            for (GridCacheEntryEx<K, V> entry : entries) {
-                K key = entry.key();
-
-                IgniteTxEntry<K, V> txEntry = entry(entry.txKey());
-
-                // First time access.
-                if (txEntry == null) {
-                    GridDhtCacheEntry<K, V> cached = dhtCache.entryExx(key, 
topVer);
-
-                    addActiveCache(dhtCache.context());
-
-                    cached.unswap(!read, read);
-
-                    IgniteTxEntry<K, V>
-                        w = writeEntries == null ? null : 
writeEntries.get(idx++);
-
-                    txEntry = addEntry(NOOP,
-                        null,
-                        null,
-                        null,
-                        cached,
-                        null,
-                        CU.<K, V>empty(),
-                        false,
-                        -1L,
-                        -1L,
-                        drVers != null ? drVers[drVerIdx++] : null);
-
-                    if (w != null) {
-                        assert key.equals(w.key()) : "Invalid entry [cached=" 
+ cached + ", w=" + w + ']';
-
-                        txEntry.op(w.op());
-                        txEntry.value(w.value(), w.hasWriteValue(), 
w.hasReadValue());
-                        txEntry.valueBytes(w.valueBytes());
-                        txEntry.drVersion(w.drVersion());
-                        txEntry.entryProcessors(w.entryProcessors());
-                        txEntry.ttl(w.ttl());
-                        txEntry.filters(w.filters());
-                        txEntry.drExpireTime(w.drExpireTime());
-                        txEntry.expiry(w.expiry());
-                    }
-                    else if (read)
-                        txEntry.ttl(accessTtl);
-
-                    txEntry.cached(cached, txEntry.keyBytes());
-
-                    addReader(msgId, cached, txEntry, topVer);
-                }
-                else {
-                    if (skipped == null)
-                        skipped = new GridLeanSet<>();
-
-                    skipped.add(key);
-                }
-            }
-
-            assert pessimistic();
-
-            Collection<K> keys = F.viewReadOnly(entries, CU.<K, V>entry2Key());
-
-            // Acquire locks only after having added operation to the write 
set.
-            // Otherwise, during rollback we will not know whether locks need
-            // to be rolled back.
-            // Loose all skipped and previously locked (we cannot reenter 
locks here).
-            final Collection<? extends K> passedKeys = skipped != null ? 
F.view(keys, F0.notIn(skipped)) : keys;
-
-            if (log.isDebugEnabled())
-                log.debug("Lock keys: " + passedKeys);
-
-            return obtainLockAsync(cacheCtx, ret, passedKeys, read, skipped, 
accessTtl, null);
-        }
-        catch (IgniteCheckedException e) {
-            setRollbackOnly();
-
-            return new GridFinishedFuture<>(cctx.kernalContext(), e);
-        }
-    }
-
-    /**
-     * @param cacheCtx Context.
-     * @param ret Return value.
-     * @param passedKeys Passed keys.
-     * @param read {@code True} if read.
-     * @param skipped Skipped keys.
-     * @param accessTtl TTL for read operation.
-     * @param filter Entry write filter.
-     * @return Future for lock acquisition.
-     */
-    private IgniteFuture<GridCacheReturn<V>> obtainLockAsync(
-        final GridCacheContext<K, V> cacheCtx,
-        GridCacheReturn<V> ret,
-        final Collection<? extends K> passedKeys,
-        final boolean read,
-        final Set<K> skipped,
-        final long accessTtl,
-        @Nullable final IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
-        if (log.isDebugEnabled())
-            log.debug("Before acquiring transaction lock on keys [passedKeys=" 
+ passedKeys + ", skipped=" +
-                skipped + ']');
-
-        if (passedKeys.isEmpty())
-            return new GridFinishedFuture<>(cctx.kernalContext(), ret);
-
-        GridDhtTransactionalCacheAdapter<K, V> dhtCache = cacheCtx.isNear() ? 
cacheCtx.nearTx().dht() : cacheCtx.dhtTx();
-
-        IgniteFuture<Boolean> fut = dhtCache.lockAllAsyncInternal(passedKeys,
-            lockTimeout(),
-            this,
-            isInvalidate(),
-            read,
-            /*retval*/false,
-            isolation,
-            accessTtl,
-            CU.<K, V>empty());
-
-        return new GridEmbeddedFuture<>(
-            fut,
-            new PLC1<GridCacheReturn<V>>(ret) {
-                @Override protected GridCacheReturn<V> 
postLock(GridCacheReturn<V> ret) throws IgniteCheckedException {
-                    if (log.isDebugEnabled())
-                        log.debug("Acquired transaction lock on keys: " + 
passedKeys);
-
-                    postLockWrite(cacheCtx,
-                        passedKeys,
-                        skipped,
-                        ret,
-                        /*remove*/false,
-                        /*retval*/false,
-                        /*read*/read,
-                        accessTtl,
-                        filter == null ? CU.<K, V>empty() : filter,
-                        /**computeInvoke*/false);
-
-                    return ret;
-                }
-            },
-            cctx.kernalContext());
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void addGroupTxMapping(Collection<IgniteTxKey<K>> 
keys) {
-        assert groupLock();
-
-        for (GridDistributedTxMapping<K, V> mapping : dhtMap.values())
-            
mapping.entries(Collections.unmodifiableCollection(txMap.values()), true);
-
-        // Here we know that affinity key for all given keys is our group lock 
key.
-        // Just add entries to dht mapping.
-        // Add near readers. If near cache is disabled on all nodes, do 
nothing.
-        Collection<UUID> backupIds = dhtMap.keySet();
-
-        Map<ClusterNode, List<GridDhtCacheEntry<K, V>>> locNearMap = null;
-
-        for (IgniteTxKey<K> key : keys) {
-            IgniteTxEntry<K, V> txEntry = entry(key);
-
-            if (!txEntry.groupLockEntry() || txEntry.context().isNear())
-                continue;
-
-            assert txEntry.cached() instanceof GridDhtCacheEntry : "Invalid 
entry type: " + txEntry.cached();
-
-            while (true) {
-                try {
-                    GridDhtCacheEntry<K, V> entry = (GridDhtCacheEntry<K, 
V>)txEntry.cached();
-
-                    Collection<UUID> readers = entry.readers();
-
-                    if (!F.isEmpty(readers)) {
-                        Collection<ClusterNode> nearNodes = 
cctx.discovery().nodes(readers, F0.notEqualTo(nearNodeId()),
-                            F.notIn(backupIds));
-
-                        if (log.isDebugEnabled())
-                            log.debug("Mapping entry to near nodes [nodes=" + 
U.nodeIds(nearNodes) + ", entry=" +
-                                entry + ']');
-
-                        for (ClusterNode n : nearNodes) {
-                            if (locNearMap == null)
-                                locNearMap = new HashMap<>();
-
-                            List<GridDhtCacheEntry<K, V>> entries = 
locNearMap.get(n);
-
-                            if (entries == null)
-                                locNearMap.put(n, entries = new 
LinkedList<>());
-
-                            entries.add(entry);
-                        }
-                    }
-
-                    break;
-                }
-                catch (GridCacheEntryRemovedException ignored) {
-                    // Retry.
-                    txEntry.cached(txEntry.context().dht().entryExx(key.key(), 
topologyVersion()), txEntry.keyBytes());
-                }
-            }
-        }
-
-        if (locNearMap != null)
-            addNearMapping(locNearMap);
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings({"CatchGenericClass", "ThrowableInstanceNeverThrown"})
-    @Override public boolean finish(boolean commit) throws 
IgniteCheckedException {
-        if (log.isDebugEnabled())
-            log.debug("Finishing dht local tx [tx=" + this + ", commit=" + 
commit + "]");
-
-        if (optimistic())
-            state(PREPARED);
-
-        if (commit) {
-            if (!state(COMMITTING)) {
-                IgniteTxState state = state();
-
-                if (state != COMMITTING && state != COMMITTED)
-                    throw new IgniteCheckedException("Invalid transaction 
state for commit [state=" + state() +
-                        ", tx=" + this + ']');
-                else {
-                    if (log.isDebugEnabled())
-                        log.debug("Invalid transaction state for commit 
(another thread is committing): " + this);
-
-                    return false;
-                }
-            }
-        }
-        else {
-            if (!state(ROLLING_BACK)) {
-                if (log.isDebugEnabled())
-                    log.debug("Invalid transaction state for rollback [state=" 
+ state() + ", tx=" + this + ']');
-
-                return false;
-            }
-        }
-
-        IgniteCheckedException err = null;
-
-        // Commit to DB first. This way if there is a failure, transaction
-        // won't be committed.
-        try {
-            if (commit && !isRollbackOnly())
-                userCommit();
-            else
-                userRollback();
-        }
-        catch (IgniteCheckedException e) {
-            err = e;
-
-            commit = false;
-
-            // If heuristic error.
-            if (!isRollbackOnly()) {
-                systemInvalidate(true);
-
-                U.warn(log, "Set transaction invalidation flag to true due to 
error [tx=" + CU.txString(this) +
-                    ", err=" + err + ']');
-            }
-        }
-
-        if (err != null) {
-            state(UNKNOWN);
-
-            throw err;
-        }
-        else {
-            // Committed state will be set in finish future onDone callback.
-            if (commit) {
-                if (!onePhaseCommit()) {
-                    if (!state(COMMITTED)) {
-                        state(UNKNOWN);
-
-                        throw new IgniteCheckedException("Invalid transaction 
state for commit: " + this);
-                    }
-                }
-            }
-            else {
-                if (!state(ROLLED_BACK)) {
-                    state(UNKNOWN);
-
-                    throw new IgniteCheckedException("Invalid transaction 
state for rollback: " + this);
-                }
-            }
-        }
-
-        return true;
-    }
-
-    /**
-     * Removes previously created prepare future from atomic reference.
-     *
-     * @param fut Expected future.
-     */
-    protected abstract void clearPrepareFuture(GridDhtTxPrepareFuture<K, V> 
fut);
-
-    /** {@inheritDoc} */
-    @Override public void rollback() throws IgniteCheckedException {
-        try {
-            rollbackAsync().get();
-        }
-        finally {
-            cctx.tm().txContextReset();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return GridToStringBuilder.toString(GridDhtTxLocalAdapter.class, this, 
"nearNodes", nearMap.keySet(),
-            "dhtNodes", dhtMap.keySet(), "super", super.toString());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxMapping.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxMapping.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxMapping.java
deleted file mode 100644
index 1e58edd..0000000
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxMapping.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.gridgain.grid.kernal.processors.cache.distributed.dht;
-
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.internal.util.*;
-import org.gridgain.grid.kernal.processors.cache.distributed.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-
-/**
- * DHT transaction mapping.
- */
-public class GridDhtTxMapping<K, V> {
-    /** Transaction nodes mapping (primary node -> related backup nodes). */
-    private final Map<UUID, Collection<UUID>> txNodes = new GridLeanMap<>();
-
-    /** */
-    private final List<TxMapping> mappings = new ArrayList<>();
-
-    /** */
-    private TxMapping last;
-
-    /**
-     * Adds information about next mapping.
-     *
-     * @param nodes Nodes.
-     */
-    @SuppressWarnings("ConstantConditions")
-    public void addMapping(List<ClusterNode> nodes) {
-        ClusterNode primary = F.first(nodes);
-
-        Collection<ClusterNode> backups = F.view(nodes, F.notEqualTo(primary));
-
-        if (last == null || !last.primary.equals(primary.id())) {
-            last = new TxMapping(primary, backups);
-
-            mappings.add(last);
-        }
-        else
-            last.add(backups);
-
-        Collection<UUID> storedBackups = txNodes.get(last.primary);
-
-        if (storedBackups == null)
-            txNodes.put(last.primary, storedBackups = new HashSet<>());
-
-        storedBackups.addAll(last.backups);
-    }
-
-    /**
-     * @return Primary to backup mapping.
-     */
-    public Map<UUID, Collection<UUID>> transactionNodes() {
-        return txNodes;
-    }
-
-    /**
-     * For each mapping sets flags indicating if mapping is last for node.
-     *
-     * @param mappings Mappings.
-     */
-    public void initLast(Collection<GridDistributedTxMapping<K, V>> mappings) {
-        assert this.mappings.size() == mappings.size();
-
-        int idx = 0;
-
-        for (GridDistributedTxMapping<?, ?> map : mappings) {
-            TxMapping mapping = this.mappings.get(idx);
-
-            map.lastBackups(lastBackups(mapping, idx));
-
-            boolean last = true;
-
-            for (int i = idx + 1; i < this.mappings.size(); i++) {
-                TxMapping nextMap = this.mappings.get(i);
-
-                if (nextMap.primary.equals(mapping.primary)) {
-                    last = false;
-
-                    break;
-                }
-            }
-
-            map.last(last);
-
-            idx++;
-        }
-    }
-
-    /**
-     * @param mapping Mapping.
-     * @param idx Mapping index.
-     * @return IDs of backup nodes receiving last prepare request during this 
mapping.
-     */
-    @Nullable private Collection<UUID> lastBackups(TxMapping mapping, int idx) 
{
-        Collection<UUID> res = null;
-
-        for (UUID backup : mapping.backups) {
-            boolean foundNext = false;
-
-            for (int i = idx + 1; i < mappings.size(); i++) {
-                TxMapping nextMap = mappings.get(i);
-
-                if (nextMap.primary.equals(mapping.primary) && 
nextMap.backups.contains(backup)) {
-                    foundNext = true;
-
-                    break;
-                }
-            }
-
-            if (!foundNext) {
-                if (res == null)
-                    res = new ArrayList<>(mapping.backups.size());
-
-                res.add(backup);
-            }
-        }
-
-        return res;
-    }
-
-    /**
-     */
-    private static class TxMapping {
-        /** */
-        private final UUID primary;
-
-        /** */
-        private final Set<UUID> backups;
-
-        /**
-         * @param primary Primary node.
-         * @param backups Backup nodes.
-         */
-        private TxMapping(ClusterNode primary, Iterable<ClusterNode> backups) {
-            this.primary = primary.id();
-
-            this.backups = new HashSet<>();
-
-            add(backups);
-        }
-
-        /**
-         * @param backups Backup nodes.
-         */
-        private void add(Iterable<ClusterNode> backups) {
-            for (ClusterNode n : backups)
-                this.backups.add(n.id());
-        }
-    }
-}

Reply via email to