IGNITE-264 - WIP
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/9b0bd71b Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/9b0bd71b Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/9b0bd71b Branch: refs/heads/ignite-264 Commit: 9b0bd71b5844af5fd3a89655c1e3833e3e4cfcc4 Parents: 4bf51e7 Author: Alexey Goncharuk <agoncha...@gridgain.com> Authored: Thu Feb 26 11:20:34 2015 -0800 Committer: Alexey Goncharuk <agoncha...@gridgain.com> Committed: Thu Feb 26 11:20:34 2015 -0800 ---------------------------------------------------------------------- ...ridCacheOptimisticCheckPreparedTxFuture.java | 38 ++++++++-- ...idCacheOptimisticCheckPreparedTxRequest.java | 44 +++++++++-- .../dht/GridClientPartitionTopology.java | 20 +++++ .../dht/GridDhtPartitionTopology.java | 7 ++ .../dht/GridDhtPartitionTopologyImpl.java | 20 +++++ .../distributed/dht/GridDhtTxFinishFuture.java | 5 +- .../distributed/dht/GridDhtTxPrepareFuture.java | 33 ++++++-- .../cache/distributed/dht/GridDhtTxRemote.java | 2 +- .../cache/transactions/IgniteTxAdapter.java | 1 + .../cache/transactions/IgniteTxEntry.java | 19 ++++- .../cache/transactions/IgniteTxHandler.java | 80 ++++++++++++++++---- .../cache/transactions/IgniteTxManager.java | 23 ++++++ .../GridTransactionalCacheQueueImpl.java | 14 ++-- .../jdk8/backport/ConcurrentLinkedHashMap.java | 7 ++ .../GridConcurrentLinkedHashMapSelfTest.java | 18 +++++ 15 files changed, 287 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b0bd71b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java index 75b2683..429023b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java @@ -22,6 +22,7 @@ import org.apache.ignite.cluster.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.cluster.*; import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.*; import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.util.*; @@ -155,7 +156,7 @@ public class GridCacheOptimisticCheckPreparedTxFuture<K, V> extends GridCompound GridCacheOptimisticCheckPreparedTxRequest<K, V> req = new GridCacheOptimisticCheckPreparedTxRequest<>(tx, - nodeTransactions(id), futureId(), fut.futureId()); + nodeTransactions(id), futureId(), fut.futureId(), false); try { cctx.io().send(id, req, tx.ioPolicy()); @@ -176,7 +177,7 @@ public class GridCacheOptimisticCheckPreparedTxFuture<K, V> extends GridCompound add(fut); GridCacheOptimisticCheckPreparedTxRequest<K, V> req = new GridCacheOptimisticCheckPreparedTxRequest<>( - tx, nodeTransactions(nodeId), futureId(), fut.futureId()); + tx, nodeTransactions(nodeId), futureId(), fut.futureId(), false); try { cctx.io().send(nodeId, req, tx.ioPolicy()); @@ -192,6 +193,32 @@ public class GridCacheOptimisticCheckPreparedTxFuture<K, V> extends GridCompound } } + // Specifically check originating near node. + if (tx instanceof GridDhtTxRemote) { + UUID nearNodeId = ((GridDhtTxRemote)tx).nearNodeId(); + + if (cctx.localNodeId().equals(nearNodeId)) + add(cctx.tm().nearTxCommitted(tx.nearXidVersion())); + else { + MiniFuture fut = new MiniFuture(nearNodeId); + + add(fut); + + GridCacheOptimisticCheckPreparedTxRequest<K, V> req = new GridCacheOptimisticCheckPreparedTxRequest<>( + tx, 1, futureId(), fut.futureId(), true); + + try { + cctx.io().send(nearNodeId, req, tx.ioPolicy()); + } + catch (ClusterTopologyCheckedException ignored) { + fut.onNodeLeft(); + } + catch (IgniteCheckedException e) { + fut.onError(e); + } + } + } + markInitialized(); } @@ -258,14 +285,11 @@ public class GridCacheOptimisticCheckPreparedTxFuture<K, V> extends GridCompound if (isMini(fut)) { MiniFuture f = (MiniFuture)fut; - if (f.nodeId().equals(nodeId)) { + if (f.nodeId().equals(nodeId)) f.onNodeLeft(); - - return true; - } } - return false; + return true; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b0bd71b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxRequest.java index 636c5b5..4b61b7e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxRequest.java @@ -49,6 +49,9 @@ public class GridCacheOptimisticCheckPreparedTxRequest<K, V> extends GridDistrib /** System transaction flag. */ private boolean sys; + /** Near check falg. */ + private boolean nearCheck; + /** * Empty constructor required by {@link Externalizable} */ @@ -62,8 +65,13 @@ public class GridCacheOptimisticCheckPreparedTxRequest<K, V> extends GridDistrib * @param futId Future ID. * @param miniId Mini future ID. */ - public GridCacheOptimisticCheckPreparedTxRequest(IgniteInternalTx<K, V> tx, int txNum, IgniteUuid futId, - IgniteUuid miniId) { + public GridCacheOptimisticCheckPreparedTxRequest( + IgniteInternalTx<K, V> tx, + int txNum, + IgniteUuid futId, + IgniteUuid miniId, + boolean nearCheck + ) { super(tx.xidVersion(), 0); nearXidVer = tx.nearXidVersion(); @@ -72,6 +80,7 @@ public class GridCacheOptimisticCheckPreparedTxRequest<K, V> extends GridDistrib this.futId = futId; this.miniId = miniId; this.txNum = txNum; + this.nearCheck = nearCheck; } /** @@ -109,6 +118,13 @@ public class GridCacheOptimisticCheckPreparedTxRequest<K, V> extends GridDistrib return sys; } + /** + * @return Near check flag. + */ + public boolean nearCheck() { + return nearCheck; + } + /** {@inheritDoc} */ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { writer.setBuffer(buf); @@ -137,18 +153,24 @@ public class GridCacheOptimisticCheckPreparedTxRequest<K, V> extends GridDistrib writer.incrementState(); case 8: - if (!writer.writeMessage("nearXidVer", nearXidVer)) + if (!writer.writeBoolean("nearCheck", nearCheck)) return false; writer.incrementState(); case 9: - if (!writer.writeBoolean("sys", sys)) + if (!writer.writeMessage("nearXidVer", nearXidVer)) return false; writer.incrementState(); case 10: + if (!writer.writeBoolean("sys", sys)) + return false; + + writer.incrementState(); + + case 11: if (!writer.writeInt("txNum", txNum)) return false; @@ -187,7 +209,7 @@ public class GridCacheOptimisticCheckPreparedTxRequest<K, V> extends GridDistrib reader.incrementState(); case 8: - nearXidVer = reader.readMessage("nearXidVer"); + nearCheck = reader.readBoolean("nearCheck"); if (!reader.isLastRead()) return false; @@ -195,7 +217,7 @@ public class GridCacheOptimisticCheckPreparedTxRequest<K, V> extends GridDistrib reader.incrementState(); case 9: - sys = reader.readBoolean("sys"); + nearXidVer = reader.readMessage("nearXidVer"); if (!reader.isLastRead()) return false; @@ -203,6 +225,14 @@ public class GridCacheOptimisticCheckPreparedTxRequest<K, V> extends GridDistrib reader.incrementState(); case 10: + sys = reader.readBoolean("sys"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 11: txNum = reader.readInt("txNum"); if (!reader.isLastRead()) @@ -222,7 +252,7 @@ public class GridCacheOptimisticCheckPreparedTxRequest<K, V> extends GridDistrib /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 11; + return 12; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b0bd71b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java index 239efc3..98fbd47 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java @@ -304,6 +304,26 @@ public class GridClientPartitionTopology<K, V> implements GridDhtPartitionTopolo } /** {@inheritDoc} */ + @Override public GridDhtPartitionState partitionState(UUID nodeId, int part) { + lock.readLock().lock(); + + try { + GridDhtPartitionMap partMap = node2part.get(nodeId); + + if (partMap != null) { + GridDhtPartitionState state = partMap.get(part); + + return state == null ? EVICTED : state; + } + + return EVICTED; + } + finally { + lock.readLock().unlock(); + } + } + + /** {@inheritDoc} */ @Override public Collection<ClusterNode> nodes(int p, long topVer) { lock.readLock().lock(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b0bd71b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java index d9a20ae..314d70e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java @@ -118,6 +118,13 @@ public interface GridDhtPartitionTopology<K, V> { public GridDhtPartitionMap localPartitionMap(); /** + * @param nodeId Node ID. + * @param part Partition. + * @return Partition state. + */ + public GridDhtPartitionState partitionState(UUID nodeId, int part); + + /** * @return Current update sequence. */ public long updateSequence(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b0bd71b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index acf00eb..40fde60 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -572,6 +572,26 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology<K, } /** {@inheritDoc} */ + @Override public GridDhtPartitionState partitionState(UUID nodeId, int part) { + lock.readLock().lock(); + + try { + GridDhtPartitionMap partMap = node2part.get(nodeId); + + if (partMap != null) { + GridDhtPartitionState state = partMap.get(part); + + return state == null ? EVICTED : state; + } + + return EVICTED; + } + finally { + lock.readLock().unlock(); + } + } + + /** {@inheritDoc} */ @Override public Collection<ClusterNode> nodes(int p, long topVer) { Collection<ClusterNode> affNodes = cctx.affinity().nodes(p, topVer); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b0bd71b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java index bb80480..dc4d15c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java @@ -325,6 +325,8 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur tx.subjectId(), tx.taskNameHash()); + req.writeVersion(tx.writeVersion()); + try { cctx.io().send(n, req, tx.ioPolicy()); @@ -371,8 +373,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur tx.subjectId(), tx.taskNameHash()); - if (tx.onePhaseCommit()) - req.writeVersion(tx.writeVersion()); + req.writeVersion(tx.writeVersion()); try { cctx.io().send(nearMapping.node(), req, tx.ioPolicy()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b0bd71b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index 2b1d2f0..33ee64c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -294,7 +294,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu boolean hasFilters = !F.isEmptyOrNulls(txEntry.filters()) && !F.isAlwaysTrue(txEntry.filters()); - if (hasFilters || retVal || txEntry.op() == GridCacheOperation.DELETE) { + if (hasFilters || retVal || txEntry.op() == DELETE || txEntry.op() == TRANSFORM) { cached.unswap(true, retVal); V val = cached.innerGet( @@ -311,14 +311,13 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu null, null); - if (retVal) { + if (retVal || txEntry.op() == TRANSFORM) { if (!F.isEmpty(txEntry.entryProcessors())) { K key = txEntry.key(); Object procRes = null; Exception err = null; - for (T2<EntryProcessor<K, V, ?>, Object[]> t : txEntry.entryProcessors()) { try { CacheInvokeEntry<K, V> invokeEntry = new CacheInvokeEntry<>(txEntry.context(), key, val); @@ -336,6 +335,8 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu } } + txEntry.entryProcessorCalculatedValue(val); + if (err != null || procRes != null) ret.addEntryProcessResult(key, err == null ? new CacheInvokeResult<>(procRes) : new CacheInvokeResult<>(err)); @@ -360,7 +361,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu ret.success(false); } else - ret.success(txEntry.op() != GridCacheOperation.DELETE || cached.hasValue()); + ret.success(txEntry.op() != DELETE || cached.hasValue()); } } catch (IgniteCheckedException e) { @@ -1002,7 +1003,8 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu private boolean map( IgniteTxEntry<K, V> entry, Map<UUID, GridDistributedTxMapping<K, V>> futDhtMap, - Map<UUID, GridDistributedTxMapping<K, V>> futNearMap) { + Map<UUID, GridDistributedTxMapping<K, V>> futNearMap + ) { if (entry.cached().isLocal()) return false; @@ -1069,14 +1071,31 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu * @param locMap Exclude map. * @return {@code True} if mapped. */ - private boolean map(IgniteTxEntry<K, V> entry, Iterable<ClusterNode> nodes, - Map<UUID, GridDistributedTxMapping<K, V>> globalMap, Map<UUID, GridDistributedTxMapping<K, V>> locMap) { + private boolean map( + IgniteTxEntry<K, V> entry, + Iterable<ClusterNode> nodes, + Map<UUID, GridDistributedTxMapping<K, V>> globalMap, + Map<UUID, GridDistributedTxMapping<K, V>> locMap + ) { boolean ret = false; if (nodes != null) { for (ClusterNode n : nodes) { GridDistributedTxMapping<K, V> global = globalMap.get(n.id()); + if (!F.isEmpty(entry.entryProcessors())) { + GridDhtPartitionState state = entry.context().topology().partitionState(n.id(), + entry.cached().partition()); + + if (state != GridDhtPartitionState.OWNING && state != GridDhtPartitionState.EVICTED) { + V procVal = entry.entryProcessorCalculatedValue(); + + entry.op(procVal == null ? DELETE : UPDATE); + entry.value(procVal, true, false); + entry.entryProcessors(null); + } + } + if (global == null) globalMap.put(n.id(), global = new GridDistributedTxMapping<>(n)); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b0bd71b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java index 3366033..ed37ae4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java @@ -203,7 +203,7 @@ public class GridDhtTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V> /** * @return Near node ID. */ - UUID nearNodeId() { + public UUID nearNodeId() { return nearNodeId; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b0bd71b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index 8410ef7..e11a5b2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -160,6 +160,7 @@ public abstract class IgniteTxAdapter<K, V> extends GridMetadataAwareAdapter private AtomicBoolean preparing = new AtomicBoolean(); /** */ + @GridToStringInclude private Set<Integer> invalidParts = new GridLeanSet<>(); /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b0bd71b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java index 6bb768c..286f896 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java @@ -83,6 +83,9 @@ public class IgniteTxEntry<K, V> implements GridPeerDeployAware, Externalizable, @GridToStringInclude private Collection<T2<EntryProcessor<K, V, ?>, Object[]>> entryProcessorsCol; + /** Transient field for calculated entry processor value. */ + private V entryProcessorCalcVal; + /** Transform closure bytes. */ @GridToStringExclude private byte[] transformClosBytes; @@ -420,6 +423,20 @@ public class IgniteTxEntry<K, V> implements GridPeerDeployAware, Externalizable, } /** + * @return Entry processor calculated value. + */ + public V entryProcessorCalculatedValue() { + return entryProcessorCalcVal; + } + + /** + * @param entryProcessorCalcVal Entry processor calculated value. + */ + public void entryProcessorCalculatedValue(V entryProcessorCalcVal) { + this.entryProcessorCalcVal = entryProcessorCalcVal; + } + + /** * @return Underlying cache entry. */ public GridCacheEntryEx<K, V> cached() { @@ -902,7 +919,7 @@ public class IgniteTxEntry<K, V> implements GridPeerDeployAware, Externalizable, @Override public String toString() { return GridToStringBuilder.toString(IgniteTxEntry.class, this, "keyBytesSize", keyBytes == null ? "null" : Integer.toString(keyBytes.length), - "xidVer", tx == null ? "null" : tx.xidVersion()); + "xidVer", tx == null ? "null" : tx.xidVersion(), "hc", System.identityHashCode(this)); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b0bd71b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index 90af0b2..ca58d6b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -798,15 +798,14 @@ public class IgniteTxHandler<K, V> { try { if (req.commit() || req.isSystemInvalidate()) { - if (tx.commitVersion(req.commitVersion())) { - tx.invalidate(req.isInvalidate()); - tx.systemInvalidate(req.isSystemInvalidate()); + tx.commitVersion(req.commitVersion()); + tx.invalidate(req.isInvalidate()); + tx.systemInvalidate(req.isSystemInvalidate()); - // Complete remote candidates. - tx.doneRemote(req.version()); + // Complete remote candidates. + tx.doneRemote(req.version()); - tx.commit(); - } + tx.commit(); } else { tx.doneRemote(req.version()); @@ -986,8 +985,14 @@ public class IgniteTxHandler<K, V> { // in prepare phase will get properly ordered as well. tx.prepare(); - if (req.last()) + if (req.last()) { + assert !F.isEmpty(req.transactionNodes()) : + "Received last prepare request with empty transaction nodes: " + req; + + tx.transactionNodes(req.transactionNodes()); + tx.state(PREPARED); + } res.invalidPartitions(tx.invalidPartitions()); @@ -1085,20 +1090,69 @@ public class IgniteTxHandler<K, V> { * @param nodeId Node ID. * @param req Request. */ - protected void processCheckPreparedTxRequest(UUID nodeId, GridCacheOptimisticCheckPreparedTxRequest<K, V> req) { + protected void processCheckPreparedTxRequest( + final UUID nodeId, + final GridCacheOptimisticCheckPreparedTxRequest<K, V> req + ) { if (log.isDebugEnabled()) log.debug("Processing check prepared transaction requests [nodeId=" + nodeId + ", req=" + req + ']'); - boolean prepared = ctx.tm().txsPreparedOrCommitted(req.nearXidVersion(), req.transactions()); + if (req.nearCheck()) { + IgniteInternalFuture<Boolean> fut = ctx.tm().nearTxCommitted(req.nearXidVersion()); + + fut.listenAsync(new CI1<IgniteInternalFuture<Boolean>>() { + @Override public void apply(IgniteInternalFuture<Boolean> f) { + try { + boolean prepared = f.get(); - GridCacheOptimisticCheckPreparedTxResponse<K, V> res = - new GridCacheOptimisticCheckPreparedTxResponse<>(req.version(), req.futureId(), req.miniId(), prepared); + sendCheckPrepareTxResponse(nodeId, + new GridCacheOptimisticCheckPreparedTxResponse<K, V>( + req.version(), + req.futureId(), + req.miniId(), + prepared), + req.system()); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to wait for transaction check prepared future " + + "(will send rolled back response): " + req.nearXidVersion(), e); + + sendCheckPrepareTxResponse(nodeId, + new GridCacheOptimisticCheckPreparedTxResponse<K, V>( + req.version(), + req.futureId(), + req.miniId(), + false), + req.system()); + } + } + }); + } + else { + boolean prepared = ctx.tm().txsPreparedOrCommitted(req.nearXidVersion(), req.transactions()); + + sendCheckPrepareTxResponse(nodeId, + new GridCacheOptimisticCheckPreparedTxResponse<K, V>(req.version(), req.futureId(), req.miniId(), prepared), + req.system()); + } + } + + /** + * @param nodeId Node ID. + * @param res Response to send. + * @param sys System pool flag. + */ + private void sendCheckPrepareTxResponse( + UUID nodeId, + GridCacheOptimisticCheckPreparedTxResponse<K, V> res, + boolean sys + ) { try { if (log.isDebugEnabled()) log.debug("Sending check prepared transaction response [nodeId=" + nodeId + ", res=" + res + ']'); - ctx.io().send(nodeId, res, req.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL); + ctx.io().send(nodeId, res, sys ? UTILITY_CACHE_POOL : SYSTEM_POOL); } catch (ClusterTopologyCheckedException ignored) { if (log.isDebugEnabled()) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b0bd71b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index e347cce..974144a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -1653,6 +1653,29 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> { } /** + * @param nearVer Near version to check. + * @return Future. + */ + public IgniteInternalFuture<Boolean> nearTxCommitted(GridCacheVersion nearVer) { + for (final IgniteInternalTx<K, V> tx : txs()) { + if (tx.near() && tx.xidVersion().equals(nearVer)) { + return tx.done() ? + new GridFinishedFutureEx<>(tx.state() == COMMITTED) : + tx.finishFuture().chain(new C1<IgniteInternalFuture<IgniteInternalTx>, Boolean>() { + @Override public Boolean apply(IgniteInternalFuture<IgniteInternalTx> f) { + return tx.state() == COMMITTED; + } + }); + } + } + + // Transaction was not found. Check committed versions buffer. + Boolean res = completedVers.get(nearVer); + + return new GridFinishedFutureEx<>(res != null && res); + } + + /** * Gets local transaction for pessimistic tx recovery. * * @param nearXidVer Near tx ID. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b0bd71b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java index 801e27f..37817c4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java @@ -21,7 +21,9 @@ import org.apache.ignite.*; import org.apache.ignite.internal.cluster.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.transactions.*; +import org.apache.ignite.internal.transactions.*; import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.transactions.*; import org.jetbrains.annotations.*; import java.util.*; @@ -30,7 +32,7 @@ import static org.apache.ignite.transactions.TransactionConcurrency.*; import static org.apache.ignite.transactions.TransactionIsolation.*; /** - * {@link org.apache.ignite.IgniteQueue} implementation using transactional cache. + * {@link IgniteQueue} implementation using transactional cache. */ public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T> { /** @@ -72,7 +74,7 @@ public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T> break; } } - catch (ClusterTopologyCheckedException e) { + catch (ClusterTopologyCheckedException | TransactionRollbackException | IgniteTxRollbackCheckedException e) { if (e instanceof ClusterGroupEmptyCheckedException) throw e; @@ -119,14 +121,14 @@ public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T> break; } - catch (ClusterTopologyCheckedException e) { + catch (ClusterTopologyCheckedException | TransactionRollbackException | IgniteTxRollbackCheckedException e) { if (e instanceof ClusterGroupEmptyCheckedException) throw e; if (cnt++ == MAX_UPDATE_RETRIES) throw e; else { - U.warn(log, "Failed to add item, will retry [err=" + e + ']'); + U.warn(log, "Failed to poll item, will retry [err=" + e + ']'); U.sleep(RETRY_DELAY); } @@ -176,7 +178,7 @@ public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T> break; } - catch (ClusterTopologyCheckedException e) { + catch (ClusterTopologyCheckedException | TransactionRollbackException | IgniteTxRollbackCheckedException e) { if (e instanceof ClusterGroupEmptyCheckedException) throw e; @@ -219,7 +221,7 @@ public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T> break; } - catch (ClusterTopologyCheckedException e) { + catch (ClusterTopologyCheckedException | TransactionRollbackException | IgniteTxRollbackCheckedException e) { if (e instanceof ClusterGroupEmptyCheckedException) throw e; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b0bd71b/modules/core/src/main/java/org/jdk8/backport/ConcurrentLinkedHashMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/jdk8/backport/ConcurrentLinkedHashMap.java b/modules/core/src/main/java/org/jdk8/backport/ConcurrentLinkedHashMap.java index 8992e77..e3f16a2 100644 --- a/modules/core/src/main/java/org/jdk8/backport/ConcurrentLinkedHashMap.java +++ b/modules/core/src/main/java/org/jdk8/backport/ConcurrentLinkedHashMap.java @@ -724,6 +724,13 @@ public class ConcurrentLinkedHashMap<K, V> extends AbstractMap<K, V> implements if (!onlyIfAbsent) { e.val = val; + HashEntry<K, V> qEntry = (HashEntry<K, V>)e.node.item(); + + if (qEntry != null && qEntry != e) + qEntry.val = val; + + ((HashEntry<K, V>)e.node.item).val = val; + modified = true; } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b0bd71b/modules/core/src/test/java/org/apache/ignite/lang/utils/GridConcurrentLinkedHashMapSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/lang/utils/GridConcurrentLinkedHashMapSelfTest.java b/modules/core/src/test/java/org/apache/ignite/lang/utils/GridConcurrentLinkedHashMapSelfTest.java index 75ac896..49f24b0 100644 --- a/modules/core/src/test/java/org/apache/ignite/lang/utils/GridConcurrentLinkedHashMapSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/lang/utils/GridConcurrentLinkedHashMapSelfTest.java @@ -176,6 +176,24 @@ public class GridConcurrentLinkedHashMapSelfTest extends GridCommonAbstractTest } /** + * @throws Exception If failed. + */ + public void testRehash() throws Exception { + Map<Integer, Date> map = new ConcurrentLinkedHashMap<>(10); + + for (int i = 0; i < 100; i++) + // Will initiate rehash in the middle. + map.put(i, new Date(0)); + + for (int i = 0; i < 100; i++) + // Will initiate rehash in the middle. + map.put(i, new Date(1)); + + for (Date date : map.values()) + assertEquals(1L, date.getTime()); + } + + /** * */ public void testDescendingMethods() {