IGNITE-264 - WIP

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/9b0bd71b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/9b0bd71b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/9b0bd71b

Branch: refs/heads/ignite-264
Commit: 9b0bd71b5844af5fd3a89655c1e3833e3e4cfcc4
Parents: 4bf51e7
Author: Alexey Goncharuk <agoncha...@gridgain.com>
Authored: Thu Feb 26 11:20:34 2015 -0800
Committer: Alexey Goncharuk <agoncha...@gridgain.com>
Committed: Thu Feb 26 11:20:34 2015 -0800

----------------------------------------------------------------------
 ...ridCacheOptimisticCheckPreparedTxFuture.java | 38 ++++++++--
 ...idCacheOptimisticCheckPreparedTxRequest.java | 44 +++++++++--
 .../dht/GridClientPartitionTopology.java        | 20 +++++
 .../dht/GridDhtPartitionTopology.java           |  7 ++
 .../dht/GridDhtPartitionTopologyImpl.java       | 20 +++++
 .../distributed/dht/GridDhtTxFinishFuture.java  |  5 +-
 .../distributed/dht/GridDhtTxPrepareFuture.java | 33 ++++++--
 .../cache/distributed/dht/GridDhtTxRemote.java  |  2 +-
 .../cache/transactions/IgniteTxAdapter.java     |  1 +
 .../cache/transactions/IgniteTxEntry.java       | 19 ++++-
 .../cache/transactions/IgniteTxHandler.java     | 80 ++++++++++++++++----
 .../cache/transactions/IgniteTxManager.java     | 23 ++++++
 .../GridTransactionalCacheQueueImpl.java        | 14 ++--
 .../jdk8/backport/ConcurrentLinkedHashMap.java  |  7 ++
 .../GridConcurrentLinkedHashMapSelfTest.java    | 18 +++++
 15 files changed, 287 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b0bd71b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java
index 75b2683..429023b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java
@@ -22,6 +22,7 @@ import org.apache.ignite.cluster.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.cluster.*;
 import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.*;
 import org.apache.ignite.internal.processors.cache.transactions.*;
 import org.apache.ignite.internal.processors.cache.version.*;
 import org.apache.ignite.internal.util.*;
@@ -155,7 +156,7 @@ public class GridCacheOptimisticCheckPreparedTxFuture<K, V> 
extends GridCompound
 
                     GridCacheOptimisticCheckPreparedTxRequest<K, V>
                         req = new 
GridCacheOptimisticCheckPreparedTxRequest<>(tx,
-                        nodeTransactions(id), futureId(), fut.futureId());
+                        nodeTransactions(id), futureId(), fut.futureId(), 
false);
 
                     try {
                         cctx.io().send(id, req, tx.ioPolicy());
@@ -176,7 +177,7 @@ public class GridCacheOptimisticCheckPreparedTxFuture<K, V> 
extends GridCompound
                 add(fut);
 
                 GridCacheOptimisticCheckPreparedTxRequest<K, V> req = new 
GridCacheOptimisticCheckPreparedTxRequest<>(
-                    tx, nodeTransactions(nodeId), futureId(), fut.futureId());
+                    tx, nodeTransactions(nodeId), futureId(), fut.futureId(), 
false);
 
                 try {
                     cctx.io().send(nodeId, req, tx.ioPolicy());
@@ -192,6 +193,32 @@ public class GridCacheOptimisticCheckPreparedTxFuture<K, 
V> extends GridCompound
             }
         }
 
+        // Specifically check originating near node.
+        if (tx instanceof GridDhtTxRemote) {
+            UUID nearNodeId = ((GridDhtTxRemote)tx).nearNodeId();
+
+            if (cctx.localNodeId().equals(nearNodeId))
+                add(cctx.tm().nearTxCommitted(tx.nearXidVersion()));
+            else {
+                MiniFuture fut = new MiniFuture(nearNodeId);
+
+                add(fut);
+
+                GridCacheOptimisticCheckPreparedTxRequest<K, V> req = new 
GridCacheOptimisticCheckPreparedTxRequest<>(
+                    tx, 1, futureId(), fut.futureId(), true);
+
+                try {
+                    cctx.io().send(nearNodeId, req, tx.ioPolicy());
+                }
+                catch (ClusterTopologyCheckedException ignored) {
+                    fut.onNodeLeft();
+                }
+                catch (IgniteCheckedException e) {
+                    fut.onError(e);
+                }
+            }
+        }
+
         markInitialized();
     }
 
@@ -258,14 +285,11 @@ public class GridCacheOptimisticCheckPreparedTxFuture<K, 
V> extends GridCompound
             if (isMini(fut)) {
                 MiniFuture f = (MiniFuture)fut;
 
-                if (f.nodeId().equals(nodeId)) {
+                if (f.nodeId().equals(nodeId))
                     f.onNodeLeft();
-
-                    return true;
-                }
             }
 
-        return false;
+        return true;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b0bd71b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxRequest.java
index 636c5b5..4b61b7e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxRequest.java
@@ -49,6 +49,9 @@ public class GridCacheOptimisticCheckPreparedTxRequest<K, V> 
extends GridDistrib
     /** System transaction flag. */
     private boolean sys;
 
+    /** Near check falg. */
+    private boolean nearCheck;
+
     /**
      * Empty constructor required by {@link Externalizable}
      */
@@ -62,8 +65,13 @@ public class GridCacheOptimisticCheckPreparedTxRequest<K, V> 
extends GridDistrib
      * @param futId Future ID.
      * @param miniId Mini future ID.
      */
-    public GridCacheOptimisticCheckPreparedTxRequest(IgniteInternalTx<K, V> 
tx, int txNum, IgniteUuid futId,
-        IgniteUuid miniId) {
+    public GridCacheOptimisticCheckPreparedTxRequest(
+        IgniteInternalTx<K, V> tx,
+        int txNum,
+        IgniteUuid futId,
+        IgniteUuid miniId,
+        boolean nearCheck
+    ) {
         super(tx.xidVersion(), 0);
 
         nearXidVer = tx.nearXidVersion();
@@ -72,6 +80,7 @@ public class GridCacheOptimisticCheckPreparedTxRequest<K, V> 
extends GridDistrib
         this.futId = futId;
         this.miniId = miniId;
         this.txNum = txNum;
+        this.nearCheck = nearCheck;
     }
 
     /**
@@ -109,6 +118,13 @@ public class GridCacheOptimisticCheckPreparedTxRequest<K, 
V> extends GridDistrib
         return sys;
     }
 
+    /**
+     * @return Near check flag.
+     */
+    public boolean nearCheck() {
+        return nearCheck;
+    }
+
     /** {@inheritDoc} */
     @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
         writer.setBuffer(buf);
@@ -137,18 +153,24 @@ public class GridCacheOptimisticCheckPreparedTxRequest<K, 
V> extends GridDistrib
                 writer.incrementState();
 
             case 8:
-                if (!writer.writeMessage("nearXidVer", nearXidVer))
+                if (!writer.writeBoolean("nearCheck", nearCheck))
                     return false;
 
                 writer.incrementState();
 
             case 9:
-                if (!writer.writeBoolean("sys", sys))
+                if (!writer.writeMessage("nearXidVer", nearXidVer))
                     return false;
 
                 writer.incrementState();
 
             case 10:
+                if (!writer.writeBoolean("sys", sys))
+                    return false;
+
+                writer.incrementState();
+
+            case 11:
                 if (!writer.writeInt("txNum", txNum))
                     return false;
 
@@ -187,7 +209,7 @@ public class GridCacheOptimisticCheckPreparedTxRequest<K, 
V> extends GridDistrib
                 reader.incrementState();
 
             case 8:
-                nearXidVer = reader.readMessage("nearXidVer");
+                nearCheck = reader.readBoolean("nearCheck");
 
                 if (!reader.isLastRead())
                     return false;
@@ -195,7 +217,7 @@ public class GridCacheOptimisticCheckPreparedTxRequest<K, 
V> extends GridDistrib
                 reader.incrementState();
 
             case 9:
-                sys = reader.readBoolean("sys");
+                nearXidVer = reader.readMessage("nearXidVer");
 
                 if (!reader.isLastRead())
                     return false;
@@ -203,6 +225,14 @@ public class GridCacheOptimisticCheckPreparedTxRequest<K, 
V> extends GridDistrib
                 reader.incrementState();
 
             case 10:
+                sys = reader.readBoolean("sys");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 11:
                 txNum = reader.readInt("txNum");
 
                 if (!reader.isLastRead())
@@ -222,7 +252,7 @@ public class GridCacheOptimisticCheckPreparedTxRequest<K, 
V> extends GridDistrib
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 11;
+        return 12;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b0bd71b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 239efc3..98fbd47 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -304,6 +304,26 @@ public class GridClientPartitionTopology<K, V> implements 
GridDhtPartitionTopolo
     }
 
     /** {@inheritDoc} */
+    @Override public GridDhtPartitionState partitionState(UUID nodeId, int 
part) {
+        lock.readLock().lock();
+
+        try {
+            GridDhtPartitionMap partMap = node2part.get(nodeId);
+
+            if (partMap != null) {
+                GridDhtPartitionState state = partMap.get(part);
+
+                return state == null ? EVICTED : state;
+            }
+
+            return EVICTED;
+        }
+        finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public Collection<ClusterNode> nodes(int p, long topVer) {
         lock.readLock().lock();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b0bd71b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index d9a20ae..314d70e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -118,6 +118,13 @@ public interface GridDhtPartitionTopology<K, V> {
     public GridDhtPartitionMap localPartitionMap();
 
     /**
+     * @param nodeId Node ID.
+     * @param part Partition.
+     * @return Partition state.
+     */
+    public GridDhtPartitionState partitionState(UUID nodeId, int part);
+
+    /**
      * @return Current update sequence.
      */
     public long updateSequence();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b0bd71b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index acf00eb..40fde60 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -572,6 +572,26 @@ class GridDhtPartitionTopologyImpl<K, V> implements 
GridDhtPartitionTopology<K,
     }
 
     /** {@inheritDoc} */
+    @Override public GridDhtPartitionState partitionState(UUID nodeId, int 
part) {
+        lock.readLock().lock();
+
+        try {
+            GridDhtPartitionMap partMap = node2part.get(nodeId);
+
+            if (partMap != null) {
+                GridDhtPartitionState state = partMap.get(part);
+
+                return state == null ? EVICTED : state;
+            }
+
+            return EVICTED;
+        }
+        finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public Collection<ClusterNode> nodes(int p, long topVer) {
         Collection<ClusterNode> affNodes = cctx.affinity().nodes(p, topVer);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b0bd71b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
index bb80480..dc4d15c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
@@ -325,6 +325,8 @@ public final class GridDhtTxFinishFuture<K, V> extends 
GridCompoundIdentityFutur
                 tx.subjectId(),
                 tx.taskNameHash());
 
+            req.writeVersion(tx.writeVersion());
+
             try {
                 cctx.io().send(n, req, tx.ioPolicy());
 
@@ -371,8 +373,7 @@ public final class GridDhtTxFinishFuture<K, V> extends 
GridCompoundIdentityFutur
                     tx.subjectId(),
                     tx.taskNameHash());
 
-                if (tx.onePhaseCommit())
-                    req.writeVersion(tx.writeVersion());
+                req.writeVersion(tx.writeVersion());
 
                 try {
                     cctx.io().send(nearMapping.node(), req, tx.ioPolicy());

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b0bd71b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 2b1d2f0..33ee64c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -294,7 +294,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends 
GridCompoundIdentityFutu
 
                 boolean hasFilters = !F.isEmptyOrNulls(txEntry.filters()) && 
!F.isAlwaysTrue(txEntry.filters());
 
-                if (hasFilters || retVal || txEntry.op() == 
GridCacheOperation.DELETE) {
+                if (hasFilters || retVal || txEntry.op() == DELETE || 
txEntry.op() == TRANSFORM) {
                     cached.unswap(true, retVal);
 
                     V val = cached.innerGet(
@@ -311,14 +311,13 @@ public final class GridDhtTxPrepareFuture<K, V> extends 
GridCompoundIdentityFutu
                         null,
                         null);
 
-                    if (retVal) {
+                    if (retVal || txEntry.op() == TRANSFORM) {
                         if (!F.isEmpty(txEntry.entryProcessors())) {
                             K key = txEntry.key();
 
                             Object procRes = null;
                             Exception err = null;
 
-
                             for (T2<EntryProcessor<K, V, ?>, Object[]> t : 
txEntry.entryProcessors()) {
                                 try {
                                     CacheInvokeEntry<K, V> invokeEntry = new 
CacheInvokeEntry<>(txEntry.context(), key, val);
@@ -336,6 +335,8 @@ public final class GridDhtTxPrepareFuture<K, V> extends 
GridCompoundIdentityFutu
                                 }
                             }
 
+                            txEntry.entryProcessorCalculatedValue(val);
+
                             if (err != null || procRes != null)
                                 ret.addEntryProcessResult(key,
                                     err == null ? new 
CacheInvokeResult<>(procRes) : new CacheInvokeResult<>(err));
@@ -360,7 +361,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends 
GridCompoundIdentityFutu
                         ret.success(false);
                     }
                     else
-                        ret.success(txEntry.op() != GridCacheOperation.DELETE 
|| cached.hasValue());
+                        ret.success(txEntry.op() != DELETE || 
cached.hasValue());
                 }
             }
             catch (IgniteCheckedException e) {
@@ -1002,7 +1003,8 @@ public final class GridDhtTxPrepareFuture<K, V> extends 
GridCompoundIdentityFutu
     private boolean map(
         IgniteTxEntry<K, V> entry,
         Map<UUID, GridDistributedTxMapping<K, V>> futDhtMap,
-        Map<UUID, GridDistributedTxMapping<K, V>> futNearMap) {
+        Map<UUID, GridDistributedTxMapping<K, V>> futNearMap
+    ) {
         if (entry.cached().isLocal())
             return false;
 
@@ -1069,14 +1071,31 @@ public final class GridDhtTxPrepareFuture<K, V> extends 
GridCompoundIdentityFutu
      * @param locMap Exclude map.
      * @return {@code True} if mapped.
      */
-    private boolean map(IgniteTxEntry<K, V> entry, Iterable<ClusterNode> nodes,
-        Map<UUID, GridDistributedTxMapping<K, V>> globalMap, Map<UUID, 
GridDistributedTxMapping<K, V>> locMap) {
+    private boolean map(
+        IgniteTxEntry<K, V> entry,
+        Iterable<ClusterNode> nodes,
+        Map<UUID, GridDistributedTxMapping<K, V>> globalMap,
+        Map<UUID, GridDistributedTxMapping<K, V>> locMap
+    ) {
         boolean ret = false;
 
         if (nodes != null) {
             for (ClusterNode n : nodes) {
                 GridDistributedTxMapping<K, V> global = globalMap.get(n.id());
 
+                if (!F.isEmpty(entry.entryProcessors())) {
+                    GridDhtPartitionState state = 
entry.context().topology().partitionState(n.id(),
+                        entry.cached().partition());
+
+                    if (state != GridDhtPartitionState.OWNING && state != 
GridDhtPartitionState.EVICTED) {
+                        V procVal = entry.entryProcessorCalculatedValue();
+
+                        entry.op(procVal == null ? DELETE : UPDATE);
+                        entry.value(procVal, true, false);
+                        entry.entryProcessors(null);
+                    }
+                }
+
                 if (global == null)
                     globalMap.put(n.id(), global = new 
GridDistributedTxMapping<>(n));
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b0bd71b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
index 3366033..ed37ae4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
@@ -203,7 +203,7 @@ public class GridDhtTxRemote<K, V> extends 
GridDistributedTxRemoteAdapter<K, V>
     /**
      * @return Near node ID.
      */
-    UUID nearNodeId() {
+    public UUID nearNodeId() {
         return nearNodeId;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b0bd71b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index 8410ef7..e11a5b2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -160,6 +160,7 @@ public abstract class IgniteTxAdapter<K, V> extends 
GridMetadataAwareAdapter
     private AtomicBoolean preparing = new AtomicBoolean();
 
     /** */
+    @GridToStringInclude
     private Set<Integer> invalidParts = new GridLeanSet<>();
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b0bd71b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
index 6bb768c..286f896 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
@@ -83,6 +83,9 @@ public class IgniteTxEntry<K, V> implements 
GridPeerDeployAware, Externalizable,
     @GridToStringInclude
     private Collection<T2<EntryProcessor<K, V, ?>, Object[]>> 
entryProcessorsCol;
 
+    /** Transient field for calculated entry processor value. */
+    private V entryProcessorCalcVal;
+
     /** Transform closure bytes. */
     @GridToStringExclude
     private byte[] transformClosBytes;
@@ -420,6 +423,20 @@ public class IgniteTxEntry<K, V> implements 
GridPeerDeployAware, Externalizable,
     }
 
     /**
+     * @return Entry processor calculated value.
+     */
+    public V entryProcessorCalculatedValue() {
+        return entryProcessorCalcVal;
+    }
+
+    /**
+     * @param entryProcessorCalcVal Entry processor calculated value.
+     */
+    public void entryProcessorCalculatedValue(V entryProcessorCalcVal) {
+        this.entryProcessorCalcVal = entryProcessorCalcVal;
+    }
+
+    /**
      * @return Underlying cache entry.
      */
     public GridCacheEntryEx<K, V> cached() {
@@ -902,7 +919,7 @@ public class IgniteTxEntry<K, V> implements 
GridPeerDeployAware, Externalizable,
     @Override public String toString() {
         return GridToStringBuilder.toString(IgniteTxEntry.class, this,
             "keyBytesSize", keyBytes == null ? "null" : 
Integer.toString(keyBytes.length),
-            "xidVer", tx == null ? "null" : tx.xidVersion());
+            "xidVer", tx == null ? "null" : tx.xidVersion(), "hc", 
System.identityHashCode(this));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b0bd71b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 90af0b2..ca58d6b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -798,15 +798,14 @@ public class IgniteTxHandler<K, V> {
 
         try {
             if (req.commit() || req.isSystemInvalidate()) {
-                if (tx.commitVersion(req.commitVersion())) {
-                    tx.invalidate(req.isInvalidate());
-                    tx.systemInvalidate(req.isSystemInvalidate());
+                tx.commitVersion(req.commitVersion());
+                tx.invalidate(req.isInvalidate());
+                tx.systemInvalidate(req.isSystemInvalidate());
 
-                    // Complete remote candidates.
-                    tx.doneRemote(req.version());
+                // Complete remote candidates.
+                tx.doneRemote(req.version());
 
-                    tx.commit();
-                }
+                tx.commit();
             }
             else {
                 tx.doneRemote(req.version());
@@ -986,8 +985,14 @@ public class IgniteTxHandler<K, V> {
             // in prepare phase will get properly ordered as well.
             tx.prepare();
 
-            if (req.last())
+            if (req.last()) {
+                assert !F.isEmpty(req.transactionNodes()) :
+                    "Received last prepare request with empty transaction 
nodes: " + req;
+
+                tx.transactionNodes(req.transactionNodes());
+
                 tx.state(PREPARED);
+            }
 
             res.invalidPartitions(tx.invalidPartitions());
 
@@ -1085,20 +1090,69 @@ public class IgniteTxHandler<K, V> {
      * @param nodeId Node ID.
      * @param req Request.
      */
-    protected void processCheckPreparedTxRequest(UUID nodeId, 
GridCacheOptimisticCheckPreparedTxRequest<K, V> req) {
+    protected void processCheckPreparedTxRequest(
+        final UUID nodeId,
+        final GridCacheOptimisticCheckPreparedTxRequest<K, V> req
+    ) {
         if (log.isDebugEnabled())
             log.debug("Processing check prepared transaction requests 
[nodeId=" + nodeId + ", req=" + req + ']');
 
-        boolean prepared = 
ctx.tm().txsPreparedOrCommitted(req.nearXidVersion(), req.transactions());
+        if (req.nearCheck()) {
+            IgniteInternalFuture<Boolean> fut = 
ctx.tm().nearTxCommitted(req.nearXidVersion());
+
+            fut.listenAsync(new CI1<IgniteInternalFuture<Boolean>>() {
+                @Override public void apply(IgniteInternalFuture<Boolean> f) {
+                    try {
+                        boolean prepared = f.get();
 
-        GridCacheOptimisticCheckPreparedTxResponse<K, V> res =
-            new GridCacheOptimisticCheckPreparedTxResponse<>(req.version(), 
req.futureId(), req.miniId(), prepared);
+                        sendCheckPrepareTxResponse(nodeId,
+                            new GridCacheOptimisticCheckPreparedTxResponse<K, 
V>(
+                                req.version(),
+                                req.futureId(),
+                                req.miniId(),
+                                prepared),
+                            req.system());
 
+                    }
+                    catch (IgniteCheckedException e) {
+                        U.error(log, "Failed to wait for transaction check 
prepared future " +
+                            "(will send rolled back response): " + 
req.nearXidVersion(), e);
+
+                        sendCheckPrepareTxResponse(nodeId,
+                            new GridCacheOptimisticCheckPreparedTxResponse<K, 
V>(
+                                req.version(),
+                                req.futureId(),
+                                req.miniId(),
+                                false),
+                            req.system());
+                    }
+                }
+            });
+        }
+        else {
+            boolean prepared = 
ctx.tm().txsPreparedOrCommitted(req.nearXidVersion(), req.transactions());
+
+            sendCheckPrepareTxResponse(nodeId,
+                new GridCacheOptimisticCheckPreparedTxResponse<K, 
V>(req.version(), req.futureId(), req.miniId(), prepared),
+                req.system());
+        }
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param res Response to send.
+     * @param sys System pool flag.
+     */
+    private void sendCheckPrepareTxResponse(
+        UUID nodeId,
+        GridCacheOptimisticCheckPreparedTxResponse<K, V> res,
+        boolean sys
+    ) {
         try {
             if (log.isDebugEnabled())
                 log.debug("Sending check prepared transaction response 
[nodeId=" + nodeId + ", res=" + res + ']');
 
-            ctx.io().send(nodeId, res, req.system() ? UTILITY_CACHE_POOL : 
SYSTEM_POOL);
+            ctx.io().send(nodeId, res, sys ? UTILITY_CACHE_POOL : SYSTEM_POOL);
         }
         catch (ClusterTopologyCheckedException ignored) {
             if (log.isDebugEnabled())

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b0bd71b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index e347cce..974144a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -1653,6 +1653,29 @@ public class IgniteTxManager<K, V> extends 
GridCacheSharedManagerAdapter<K, V> {
     }
 
     /**
+     * @param nearVer Near version to check.
+     * @return Future.
+     */
+    public IgniteInternalFuture<Boolean> nearTxCommitted(GridCacheVersion 
nearVer) {
+        for (final IgniteInternalTx<K, V> tx : txs()) {
+            if (tx.near() && tx.xidVersion().equals(nearVer)) {
+                return tx.done() ?
+                    new GridFinishedFutureEx<>(tx.state() == COMMITTED) :
+                    tx.finishFuture().chain(new 
C1<IgniteInternalFuture<IgniteInternalTx>, Boolean>() {
+                        @Override public Boolean 
apply(IgniteInternalFuture<IgniteInternalTx> f) {
+                            return tx.state() == COMMITTED;
+                        }
+                    });
+            }
+        }
+
+        // Transaction was not found. Check committed versions buffer.
+        Boolean res = completedVers.get(nearVer);
+
+        return new GridFinishedFutureEx<>(res != null && res);
+    }
+
+    /**
      * Gets local transaction for pessimistic tx recovery.
      *
      * @param nearXidVer Near tx ID.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b0bd71b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java
index 801e27f..37817c4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java
@@ -21,7 +21,9 @@ import org.apache.ignite.*;
 import org.apache.ignite.internal.cluster.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.transactions.*;
+import org.apache.ignite.internal.transactions.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.transactions.*;
 import org.jetbrains.annotations.*;
 
 import java.util.*;
@@ -30,7 +32,7 @@ import static 
org.apache.ignite.transactions.TransactionConcurrency.*;
 import static org.apache.ignite.transactions.TransactionIsolation.*;
 
 /**
- * {@link org.apache.ignite.IgniteQueue} implementation using transactional 
cache.
+ * {@link IgniteQueue} implementation using transactional cache.
  */
 public class GridTransactionalCacheQueueImpl<T> extends 
GridCacheQueueAdapter<T> {
     /**
@@ -72,7 +74,7 @@ public class GridTransactionalCacheQueueImpl<T> extends 
GridCacheQueueAdapter<T>
                         break;
                     }
                 }
-                catch (ClusterTopologyCheckedException e) {
+                catch (ClusterTopologyCheckedException | 
TransactionRollbackException | IgniteTxRollbackCheckedException e) {
                     if (e instanceof ClusterGroupEmptyCheckedException)
                         throw e;
 
@@ -119,14 +121,14 @@ public class GridTransactionalCacheQueueImpl<T> extends 
GridCacheQueueAdapter<T>
 
                     break;
                 }
-                catch (ClusterTopologyCheckedException e) {
+                catch (ClusterTopologyCheckedException | 
TransactionRollbackException | IgniteTxRollbackCheckedException e) {
                     if (e instanceof ClusterGroupEmptyCheckedException)
                         throw e;
 
                     if (cnt++ == MAX_UPDATE_RETRIES)
                         throw e;
                     else {
-                        U.warn(log, "Failed to add item, will retry [err=" + e 
+ ']');
+                        U.warn(log, "Failed to poll item, will retry [err=" + 
e + ']');
 
                         U.sleep(RETRY_DELAY);
                     }
@@ -176,7 +178,7 @@ public class GridTransactionalCacheQueueImpl<T> extends 
GridCacheQueueAdapter<T>
 
                     break;
                 }
-                catch (ClusterTopologyCheckedException e) {
+                catch (ClusterTopologyCheckedException | 
TransactionRollbackException | IgniteTxRollbackCheckedException e) {
                     if (e instanceof ClusterGroupEmptyCheckedException)
                         throw e;
 
@@ -219,7 +221,7 @@ public class GridTransactionalCacheQueueImpl<T> extends 
GridCacheQueueAdapter<T>
 
                     break;
                 }
-                catch (ClusterTopologyCheckedException e) {
+                catch (ClusterTopologyCheckedException | 
TransactionRollbackException | IgniteTxRollbackCheckedException e) {
                     if (e instanceof ClusterGroupEmptyCheckedException)
                         throw e;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b0bd71b/modules/core/src/main/java/org/jdk8/backport/ConcurrentLinkedHashMap.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/jdk8/backport/ConcurrentLinkedHashMap.java 
b/modules/core/src/main/java/org/jdk8/backport/ConcurrentLinkedHashMap.java
index 8992e77..e3f16a2 100644
--- a/modules/core/src/main/java/org/jdk8/backport/ConcurrentLinkedHashMap.java
+++ b/modules/core/src/main/java/org/jdk8/backport/ConcurrentLinkedHashMap.java
@@ -724,6 +724,13 @@ public class ConcurrentLinkedHashMap<K, V> extends 
AbstractMap<K, V> implements
                     if (!onlyIfAbsent) {
                         e.val = val;
 
+                        HashEntry<K, V> qEntry = (HashEntry<K, 
V>)e.node.item();
+
+                        if (qEntry != null && qEntry != e)
+                            qEntry.val = val;
+
+                        ((HashEntry<K, V>)e.node.item).val = val;
+
                         modified = true;
                     }
                 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b0bd71b/modules/core/src/test/java/org/apache/ignite/lang/utils/GridConcurrentLinkedHashMapSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/lang/utils/GridConcurrentLinkedHashMapSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/lang/utils/GridConcurrentLinkedHashMapSelfTest.java
index 75ac896..49f24b0 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/lang/utils/GridConcurrentLinkedHashMapSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/lang/utils/GridConcurrentLinkedHashMapSelfTest.java
@@ -176,6 +176,24 @@ public class GridConcurrentLinkedHashMapSelfTest extends 
GridCommonAbstractTest
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testRehash() throws Exception {
+        Map<Integer, Date> map = new ConcurrentLinkedHashMap<>(10);
+
+        for (int i = 0; i < 100; i++)
+            // Will initiate rehash in the middle.
+            map.put(i, new Date(0));
+
+        for (int i = 0; i < 100; i++)
+            // Will initiate rehash in the middle.
+            map.put(i, new Date(1));
+
+        for (Date date : map.values())
+            assertEquals(1L, date.getTime());
+    }
+
+    /**
      *
      */
     public void testDescendingMethods() {

Reply via email to