Repository: incubator-ignite Updated Branches: refs/heads/ignite-23 12761e440 -> d12dd4173
# ignite-23 remap for tx updates from client Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/d12dd417 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/d12dd417 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/d12dd417 Branch: refs/heads/ignite-23 Commit: d12dd41736ead57d0f68aa211edde0b4922733fc Parents: 12761e4 Author: sboikov <sboi...@gridgain.com> Authored: Thu May 21 11:54:43 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Thu May 21 14:29:58 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheIoManager.java | 5 +- .../GridCachePartitionExchangeManager.java | 44 ++-- .../distributed/GridDistributedTxMapping.java | 17 ++ .../distributed/dht/GridDhtLockFuture.java | 10 +- .../dht/GridDhtTransactionalCacheAdapter.java | 10 +- .../distributed/dht/GridDhtTxLocalAdapter.java | 8 +- .../distributed/dht/GridDhtTxPrepareFuture.java | 3 +- .../dht/colocated/GridDhtColocatedCache.java | 12 +- .../colocated/GridDhtColocatedLockFuture.java | 8 +- .../GridDhtPartitionsExchangeFuture.java | 16 +- .../distributed/near/GridNearLockRequest.java | 1 + .../near/GridNearOptimisticTxPrepareFuture.java | 54 +++-- .../GridNearPessimisticTxPrepareFuture.java | 5 +- .../cache/distributed/near/GridNearTxLocal.java | 16 ++ .../near/GridNearTxPrepareRequest.java | 72 +++++-- .../near/GridNearTxPrepareResponse.java | 68 +++++-- .../cache/transactions/IgniteInternalTx.java | 5 + .../cache/transactions/IgniteTxAdapter.java | 15 +- .../cache/transactions/IgniteTxHandler.java | 117 ++++++++--- ...niteCacheClientNodeChangingTopologyTest.java | 200 ++++++++++++++++++- 20 files changed, 532 insertions(+), 154 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d12dd417/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index 02f16c0..b8a4c87 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -488,7 +488,10 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { req.miniId(), req.version(), req.version(), - null, null, null); + null, + null, + null, + false); res.error(req.classError()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d12dd417/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 25e18db..41a13ba 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -706,9 +706,13 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana GridDhtPartitionsExchangeFuture old = exchFuts.addx( fut = new GridDhtPartitionsExchangeFuture(cctx, busyLock, exchId, reqs)); - if (old != null) + if (old != null) { fut = old; + if (reqs != null) + fut.cacheChangeRequests(reqs); + } + if (discoEvt != null) fut.onEvent(exchId, discoEvt); @@ -870,17 +874,16 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } else { if (msg.client()) { - IgniteInternalFuture<?> fut = affinityReadyFuture(msg.exchangeId().topologyVersion()); - - if (fut != null) { - fut.listen(new CI1<IgniteInternalFuture<?>>() { - @Override public void apply(IgniteInternalFuture<?> fut) { - processSinglePartitionClientUpdate(node, msg); - } - }); - } - else - processSinglePartitionClientUpdate(node, msg); + final GridDhtPartitionsExchangeFuture exchFut = exchangeFuture(msg.exchangeId(), + null, + null); + + exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { + @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) { + // Finished future should reply only to sender client node. + exchFut.onReceive(node.id(), msg); + } + }); } else exchangeFuture(msg.exchangeId(), null, null).onReceive(node.id(), msg); @@ -892,23 +895,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } /** - * @param node Node. - * @param msg Message. - */ - private void processSinglePartitionClientUpdate(final ClusterNode node, final GridDhtPartitionsSingleMessage msg) { - final GridDhtPartitionsExchangeFuture exchFut = exchangeFuture(msg.exchangeId(), - null, - null); - - exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { - @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) { - // Finished future should reply only to sender client node. - exchFut.onReceive(node.id(), msg); - } - }); - } - - /** * @param node Node ID. * @param msg Message. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d12dd417/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java index fded3c9..bd1dedf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java @@ -63,6 +63,9 @@ public class GridDistributedTxMapping implements Externalizable { /** {@code True} if mapping is for near caches, {@code false} otherwise. */ private boolean near; + /** {@code True} if this is first mapping for optimistic tx on client node. */ + private boolean clientFirst; + /** * Empty constructor required for {@link Externalizable}. */ @@ -108,6 +111,20 @@ public class GridDistributedTxMapping implements Externalizable { } /** + * @return {@code True} if this is first mapping for optimistic tx on client node. + */ + public boolean clientFirst() { + return clientFirst; + } + + /** + * @param clientFirst {@code True} if this is first mapping for optimistic tx on client node. + */ + public void clientFirst(boolean clientFirst) { + this.clientFirst = clientFirst; + } + + /** * @return {@code True} if mapping is for near caches, {@code false} otherwise. */ public boolean near() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d12dd417/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java index c57eded..bdaa552 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java @@ -47,7 +47,7 @@ import static org.apache.ignite.internal.processors.dr.GridDrType.*; /** * Cache lock future. */ -public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Boolean> +public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean> implements GridCacheMvccFuture<Boolean>, GridDhtFuture<Boolean>, GridCacheMappedVersion { /** */ private static final long serialVersionUID = 0L; @@ -60,7 +60,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo /** Cache registry. */ @GridToStringExclude - private GridCacheContext<K, V> cctx; + private GridCacheContext<?, ?> cctx; /** Near node ID. */ private UUID nearNodeId; @@ -151,7 +151,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo * @param skipStore Skip store flag. */ public GridDhtLockFuture( - GridCacheContext<K, V> cctx, + GridCacheContext<?, ?> cctx, UUID nearNodeId, GridCacheVersion nearLockVer, @NotNull AffinityTopologyVersion topVer, @@ -221,7 +221,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo * @param cacheCtx Cache context. * @param invalidPart Partition to retry. */ - void addInvalidPartition(GridCacheContext<K, V> cacheCtx, int invalidPart) { + void addInvalidPartition(GridCacheContext<?, ?> cacheCtx, int invalidPart) { invalidParts.add(invalidPart); // Register invalid partitions with transaction. @@ -1170,7 +1170,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo * @param entries Entries to check. */ @SuppressWarnings({"ForLoopReplaceableByForEach"}) - private void evictReaders(GridCacheContext<K, V> cacheCtx, Collection<IgniteTxKey> keys, UUID nodeId, long msgId, + private void evictReaders(GridCacheContext<?, ?> cacheCtx, Collection<IgniteTxKey> keys, UUID nodeId, long msgId, @Nullable List<GridDhtCacheEntry> entries) { if (entries == null || keys == null || entries.isEmpty() || keys.isEmpty()) return; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d12dd417/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java index 26eef50..bda0868 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java @@ -518,7 +518,6 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach return; } - // Group lock can be only started from local node, so we never start group lock transaction on remote node. IgniteInternalFuture<?> f = lockAllAsync(ctx, nearNode, req, null); // Register listener just so we print out errors. @@ -534,8 +533,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach private void processDhtLockResponse(UUID nodeId, GridDhtLockResponse res) { assert nodeId != null; assert res != null; - GridDhtLockFuture<K, V> fut = (GridDhtLockFuture<K, V>)ctx.mvcc().<Boolean>future(res.version(), - res.futureId()); + GridDhtLockFuture fut = (GridDhtLockFuture)ctx.mvcc().<Boolean>future(res.version(), res.futureId()); if (fut == null) { if (log.isDebugEnabled()) @@ -604,7 +602,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach assert tx != null; - GridDhtLockFuture<K, V> fut = new GridDhtLockFuture<>( + GridDhtLockFuture fut = new GridDhtLockFuture( ctx, tx.nearNodeId(), tx.nearXidVersion(), @@ -719,10 +717,10 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach if (filter == null) filter = req.filter(); - GridDhtLockFuture<K, V> fut = null; + GridDhtLockFuture fut = null; if (!req.inTx()) { - fut = new GridDhtLockFuture<>(ctx, + fut = new GridDhtLockFuture(ctx, nearNode.id(), req.version(), req.topologyVersion(), http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d12dd417/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java index 54b59b8..90edb0f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java @@ -52,15 +52,13 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { private static final long serialVersionUID = 0L; /** Near mappings. */ - protected Map<UUID, GridDistributedTxMapping> nearMap = - new ConcurrentHashMap8<>(); + protected Map<UUID, GridDistributedTxMapping> nearMap = new ConcurrentHashMap8<>(); /** DHT mappings. */ - protected Map<UUID, GridDistributedTxMapping> dhtMap = - new ConcurrentHashMap8<>(); + protected Map<UUID, GridDistributedTxMapping> dhtMap = new ConcurrentHashMap8<>(); /** Mapped flag. */ - private AtomicBoolean mapped = new AtomicBoolean(); + protected AtomicBoolean mapped = new AtomicBoolean(); /** */ private long dhtThreadId; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d12dd417/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index 293cf95..aed00ff 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -582,7 +582,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter tx.writeVersion(), tx.invalidPartitions(), ret, - prepErr); + prepErr, + false); if (prepErr == null) { addDhtValues(res); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d12dd417/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index 05b3c7b..221b230 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -362,13 +362,13 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte @Nullable TransactionIsolation isolation, long accessTtl ) { - assert tx == null || tx instanceof GridNearTxLocal; + assert tx == null || tx instanceof GridNearTxLocal : tx; GridNearTxLocal txx = (GridNearTxLocal)tx; CacheOperationContext opCtx = ctx.operationContextPerCall(); - GridDhtColocatedLockFuture<K, V> fut = new GridDhtColocatedLockFuture<>(ctx, + GridDhtColocatedLockFuture fut = new GridDhtColocatedLockFuture(ctx, keys, txx, isRead, @@ -619,7 +619,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte * @return Lock future. */ IgniteInternalFuture<Exception> lockAllAsync( - final GridCacheContext<K, V> cacheCtx, + final GridCacheContext<?, ?> cacheCtx, @Nullable final GridNearTxLocal tx, final long threadId, final GridCacheVersion ver, @@ -700,7 +700,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte * @return Lock future. */ private IgniteInternalFuture<Exception> lockAllAsync0( - GridCacheContext<K, V> cacheCtx, + GridCacheContext<?, ?> cacheCtx, @Nullable final GridNearTxLocal tx, long threadId, final GridCacheVersion ver, @@ -715,7 +715,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte int cnt = keys.size(); if (tx == null) { - GridDhtLockFuture<K, V> fut = new GridDhtLockFuture<>(ctx, + GridDhtLockFuture fut = new GridDhtLockFuture(ctx, ctx.localNodeId(), ver, topVer, @@ -838,7 +838,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte assert nodeId != null; assert res != null; - GridDhtColocatedLockFuture<K, V> fut = (GridDhtColocatedLockFuture<K, V>)ctx.mvcc(). + GridDhtColocatedLockFuture fut = (GridDhtColocatedLockFuture)ctx.mvcc(). <Boolean>future(res.version(), res.futureId()); if (fut != null) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d12dd417/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java index 372c517..9d19152 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java @@ -46,7 +46,7 @@ import static org.apache.ignite.events.EventType.*; /** * Colocated cache lock future. */ -public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentityFuture<Boolean> +public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture<Boolean> implements GridCacheFuture<Boolean> { /** */ private static final long serialVersionUID = 0L; @@ -59,7 +59,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity /** Cache registry. */ @GridToStringExclude - private GridCacheContext<K, V> cctx; + private GridCacheContext<?, ?> cctx; /** Lock owner thread. */ @GridToStringInclude @@ -121,10 +121,10 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity * @param timeout Lock acquisition timeout. * @param accessTtl TTL for read operation. * @param filter Filter. - * @param skipStore + * @param skipStore Skip store flag. */ public GridDhtColocatedLockFuture( - GridCacheContext<K, V> cctx, + GridCacheContext<?, ?> cctx, Collection<KeyCacheObject> keys, @Nullable GridNearTxLocal tx, boolean read, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d12dd417/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 94ca540..af7fa5c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -229,15 +229,17 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT initFut = new GridFutureAdapter<>(); - // Grab all nodes with order of equal or less than last joined node. - ClusterNode node = CU.oldestAliveCacheServerNode(cctx, exchId.topologyVersion()); - - oldestNode.set(node); - if (log.isDebugEnabled()) log.debug("Creating exchange future [localNode=" + cctx.localNodeId() + ", fut=" + this + ']'); } + /** + * @param reqs Cache change requests. + */ + public void cacheChangeRequests(Collection<DynamicCacheChangeRequest> reqs) { + this.reqs = reqs; + } + /** {@inheritDoc} */ @Override public GridDiscoveryTopologySnapshot topologySnapshot() throws IgniteCheckedException { get(); @@ -461,6 +463,10 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT assert discoEvt != null : this; assert !dummy && !forcePreload : this; + ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, exchId.topologyVersion()); + + oldestNode.set(oldest); + startCaches(); // True if client node joined or failed. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d12dd417/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java index e71dd65..35ef2e6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java @@ -98,6 +98,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest { * @param implicitTx Flag to indicate that transaction is implicit. * @param implicitSingleTx Implicit-transaction-with-one-key flag. * @param isRead Indicates whether implicit lock is for read or write operation. + * @param retVal Return value flag. * @param isolation Transaction isolation. * @param isInvalidate Invalidation flag. * @param timeout Lock timeout. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d12dd417/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java index 4f74303..713b713 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java @@ -221,18 +221,18 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd if (topVer != null) { tx.topologyVersion(topVer); - prepare0(); + prepare0(false); return; } - prepareOnTopology(); + prepareOnTopology(false); } /** - * + * @param remap Remap flag. */ - private void prepareOnTopology() { + private void prepareOnTopology(final boolean remap) { GridDhtTopologyFuture topFut = topologyReadLock(); try { @@ -265,16 +265,19 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd return; } - tx.topologyVersion(topFut.topologyVersion()); + if (remap) + tx.onRemap(topFut.topologyVersion()); + else + tx.topologyVersion(topFut.topologyVersion()); - prepare0(); + prepare0(remap); } else { topFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) { cctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable() { @Override public void run() { - prepareOnTopology(); + prepareOnTopology(remap); } }); } @@ -346,10 +349,14 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd /** * Initializes future. + * + * @param remap Remap flag. */ - private void prepare0() { + private void prepare0(boolean remap) { try { - if (!tx.state(PREPARING)) { + boolean txStateCheck = remap ? tx.state() == PREPARING : tx.state(PREPARING); + + if (!txStateCheck) { if (tx.setRollbackOnly()) { if (tx.timedOut()) onError(null, null, new IgniteTxTimeoutCheckedException("Transaction timed out and " + @@ -366,7 +373,8 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd } // Make sure to add future before calling prepare. - cctx.mvcc().addFuture(this); + if (!remap) + cctx.mvcc().addFuture(this); prepare( tx.optimistic() && tx.serializable() ? tx.readEntries() : Collections.<IgniteTxEntry>emptyList(), @@ -502,7 +510,8 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd tx.implicitSingle(), m.explicitLock(), tx.subjectId(), - tx.taskNameHash()); + tx.taskNameHash(), + m.clientFirst()); for (IgniteTxEntry txEntry : m.writes()) { if (txEntry.op() == TRANSFORM) @@ -560,13 +569,14 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd * @param entry Transaction entry. * @param topVer Topology version. * @param cur Current mapping. + * @param waitLock Wait lock flag. * @throws IgniteCheckedException If transaction is group-lock and local node is not primary for key. * @return Mapping. */ private GridDistributedTxMapping map( IgniteTxEntry entry, AffinityTopologyVersion topVer, - GridDistributedTxMapping cur, + @Nullable GridDistributedTxMapping cur, boolean waitLock ) throws IgniteCheckedException { GridCacheContext cacheCtx = entry.context(); @@ -599,10 +609,14 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd } if (cur == null || !cur.node().id().equals(primary.id()) || cur.near() != cacheCtx.isNear()) { + boolean clientFirst = cur == null && cctx.kernalContext().clientNode(); + cur = new GridDistributedTxMapping(primary); // Initialize near flag right away. cur.near(cacheCtx.isNear()); + + cur.clientFirst(clientFirst); } cur.add(entry); @@ -748,11 +762,19 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd onError(nodeId, mappings, res.error()); } else { - onPrepareResponse(m, res); + if (res.clientRemap()) { + assert cctx.kernalContext().clientNode(); + assert m.clientFirst(); + + prepareOnTopology(true); + } + else { + onPrepareResponse(m, res); - // Proceed prepare before finishing mini future. - if (mappings != null) - proceedPrepare(mappings); + // Proceed prepare before finishing mini future. + if (mappings != null) + proceedPrepare(mappings); + } // Finish this mini future. onDone(tx); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d12dd417/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java index bce62c1..1d06860 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java @@ -84,6 +84,8 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA /** {@inheritDoc} */ @Override public void onResult(UUID nodeId, GridNearTxPrepareResponse res) { if (!isDone()) { + assert !res.clientRemap() : res; + for (IgniteInternalFuture<IgniteInternalTx> fut : pending()) { MiniFuture f = (MiniFuture)fut; @@ -187,7 +189,8 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA tx.implicitSingle(), m.explicitLock(), tx.subjectId(), - tx.taskNameHash()); + tx.taskNameHash(), + false); for (IgniteTxEntry txEntry : m.writes()) { if (txEntry.op() == TRANSFORM) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d12dd417/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index c38965d..e7664c1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -1185,6 +1185,22 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { } /** {@inheritDoc} */ + @Override public void onRemap(AffinityTopologyVersion topVer) { + assert cctx.kernalContext().clientNode(); + + mapped.set(false); + nearLocallyMapped = false; + colocatedLocallyMapped = false; + txNodes = null; + onePhaseCommit = false; + nearMap.clear(); + dhtMap.clear(); + mappings.clear(); + + this.topVer.set(topVer); + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridNearTxLocal.class, this, "mappings", mappings.keySet(), "super", super.toString()); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d12dd417/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java index a08637d..b602a7f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java @@ -75,6 +75,9 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { /** Task name hash. */ private int taskNameHash; + /** {@code True} if first optimistic tx prepare request sent from client node. */ + private boolean firstClientReq; + /** * Empty constructor required for {@link Externalizable}. */ @@ -92,8 +95,13 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { * @param txNodes Transaction nodes mapping. * @param last {@code True} if this last prepare request for node. * @param lastBackups IDs of backup nodes receiving last prepare request during this prepare. + * @param onePhaseCommit One phase commit flag. + * @param retVal Return value flag. + * @param implicitSingle Implicit single flag. + * @param explicitLock Explicit lock flag. * @param subjId Subject ID. * @param taskNameHash Task name hash. + * @param firstClientReq {@code True} if first optimistic tx prepare request sent from client node. */ public GridNearTxPrepareRequest( IgniteUuid futId, @@ -110,11 +118,13 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { boolean implicitSingle, boolean explicitLock, @Nullable UUID subjId, - int taskNameHash + int taskNameHash, + boolean firstClientReq ) { super(tx, reads, writes, txNodes, onePhaseCommit); assert futId != null; + assert !firstClientReq || tx.optimistic() : tx; this.futId = futId; this.topVer = topVer; @@ -126,6 +136,14 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { this.explicitLock = explicitLock; this.subjId = subjId; this.taskNameHash = taskNameHash; + this.firstClientReq = firstClientReq; + } + + /** + * @return {@code True} if first optimistic tx prepare request sent from client node. + */ + public boolean firstClientRequest() { + return firstClientReq; } /** @@ -273,60 +291,66 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { writer.incrementState(); case 24: - if (!writer.writeIgniteUuid("futId", futId)) + if (!writer.writeBoolean("firstClientReq", firstClientReq)) return false; writer.incrementState(); case 25: - if (!writer.writeBoolean("implicitSingle", implicitSingle)) + if (!writer.writeIgniteUuid("futId", futId)) return false; writer.incrementState(); case 26: - if (!writer.writeBoolean("last", last)) + if (!writer.writeBoolean("implicitSingle", implicitSingle)) return false; writer.incrementState(); case 27: - if (!writer.writeCollection("lastBackups", lastBackups, MessageCollectionItemType.UUID)) + if (!writer.writeBoolean("last", last)) return false; writer.incrementState(); case 28: - if (!writer.writeIgniteUuid("miniId", miniId)) + if (!writer.writeCollection("lastBackups", lastBackups, MessageCollectionItemType.UUID)) return false; writer.incrementState(); case 29: - if (!writer.writeBoolean("near", near)) + if (!writer.writeIgniteUuid("miniId", miniId)) return false; writer.incrementState(); case 30: - if (!writer.writeBoolean("retVal", retVal)) + if (!writer.writeBoolean("near", near)) return false; writer.incrementState(); case 31: - if (!writer.writeUuid("subjId", subjId)) + if (!writer.writeBoolean("retVal", retVal)) return false; writer.incrementState(); case 32: - if (!writer.writeInt("taskNameHash", taskNameHash)) + if (!writer.writeUuid("subjId", subjId)) return false; writer.incrementState(); case 33: + if (!writer.writeInt("taskNameHash", taskNameHash)) + return false; + + writer.incrementState(); + + case 34: if (!writer.writeMessage("topVer", topVer)) return false; @@ -357,7 +381,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); case 24: - futId = reader.readIgniteUuid("futId"); + firstClientReq = reader.readBoolean("firstClientReq"); if (!reader.isLastRead()) return false; @@ -365,7 +389,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); case 25: - implicitSingle = reader.readBoolean("implicitSingle"); + futId = reader.readIgniteUuid("futId"); if (!reader.isLastRead()) return false; @@ -373,7 +397,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); case 26: - last = reader.readBoolean("last"); + implicitSingle = reader.readBoolean("implicitSingle"); if (!reader.isLastRead()) return false; @@ -381,7 +405,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); case 27: - lastBackups = reader.readCollection("lastBackups", MessageCollectionItemType.UUID); + last = reader.readBoolean("last"); if (!reader.isLastRead()) return false; @@ -389,7 +413,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); case 28: - miniId = reader.readIgniteUuid("miniId"); + lastBackups = reader.readCollection("lastBackups", MessageCollectionItemType.UUID); if (!reader.isLastRead()) return false; @@ -397,7 +421,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); case 29: - near = reader.readBoolean("near"); + miniId = reader.readIgniteUuid("miniId"); if (!reader.isLastRead()) return false; @@ -405,7 +429,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); case 30: - retVal = reader.readBoolean("retVal"); + near = reader.readBoolean("near"); if (!reader.isLastRead()) return false; @@ -413,7 +437,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); case 31: - subjId = reader.readUuid("subjId"); + retVal = reader.readBoolean("retVal"); if (!reader.isLastRead()) return false; @@ -421,7 +445,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); case 32: - taskNameHash = reader.readInt("taskNameHash"); + subjId = reader.readUuid("subjId"); if (!reader.isLastRead()) return false; @@ -429,6 +453,14 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); case 33: + taskNameHash = reader.readInt("taskNameHash"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 34: topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) @@ -448,7 +480,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 34; + return 35; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d12dd417/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java index f8c07f7..e4dfd4b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java @@ -83,6 +83,9 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse @GridDirectCollection(IgniteTxKey.class) private Collection<IgniteTxKey> filterFailedKeys; + /** */ + private boolean clientRemap; + /** * Empty constructor required by {@link Externalizable}. */ @@ -95,9 +98,11 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse * @param futId Future ID. * @param miniId Mini future ID. * @param dhtVer DHT version. + * @param writeVer Write version. * @param invalidParts Invalid partitions. * @param retVal Return value. * @param err Error. + * @param clientRemap {@code True} if client node should remap transaction. */ public GridNearTxPrepareResponse( GridCacheVersion xid, @@ -107,7 +112,8 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse GridCacheVersion writeVer, Collection<Integer> invalidParts, GridCacheReturn retVal, - Throwable err + Throwable err, + boolean clientRemap ) { super(xid, err); @@ -121,6 +127,14 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse this.writeVer = writeVer; this.invalidParts = invalidParts; this.retVal = retVal; + this.clientRemap = clientRemap; + } + + /** + * @return {@code True} if client node should remap transaction. + */ + public boolean clientRemap() { + return clientRemap; } /** @@ -330,60 +344,66 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse switch (writer.state()) { case 10: - if (!writer.writeMessage("dhtVer", dhtVer)) + if (!writer.writeBoolean("clientRemap", clientRemap)) return false; writer.incrementState(); case 11: - if (!writer.writeCollection("filterFailedKeys", filterFailedKeys, MessageCollectionItemType.MSG)) + if (!writer.writeMessage("dhtVer", dhtVer)) return false; writer.incrementState(); case 12: - if (!writer.writeIgniteUuid("futId", futId)) + if (!writer.writeCollection("filterFailedKeys", filterFailedKeys, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 13: - if (!writer.writeCollection("invalidParts", invalidParts, MessageCollectionItemType.INT)) + if (!writer.writeIgniteUuid("futId", futId)) return false; writer.incrementState(); case 14: - if (!writer.writeIgniteUuid("miniId", miniId)) + if (!writer.writeCollection("invalidParts", invalidParts, MessageCollectionItemType.INT)) return false; writer.incrementState(); case 15: - if (!writer.writeCollection("ownedValKeys", ownedValKeys, MessageCollectionItemType.MSG)) + if (!writer.writeIgniteUuid("miniId", miniId)) return false; writer.incrementState(); case 16: - if (!writer.writeCollection("ownedValVals", ownedValVals, MessageCollectionItemType.MSG)) + if (!writer.writeCollection("ownedValKeys", ownedValKeys, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 17: - if (!writer.writeCollection("pending", pending, MessageCollectionItemType.MSG)) + if (!writer.writeCollection("ownedValVals", ownedValVals, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 18: - if (!writer.writeMessage("retVal", retVal)) + if (!writer.writeCollection("pending", pending, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 19: + if (!writer.writeMessage("retVal", retVal)) + return false; + + writer.incrementState(); + + case 20: if (!writer.writeMessage("writeVer", writeVer)) return false; @@ -406,7 +426,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse switch (reader.state()) { case 10: - dhtVer = reader.readMessage("dhtVer"); + clientRemap = reader.readBoolean("clientRemap"); if (!reader.isLastRead()) return false; @@ -414,7 +434,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse reader.incrementState(); case 11: - filterFailedKeys = reader.readCollection("filterFailedKeys", MessageCollectionItemType.MSG); + dhtVer = reader.readMessage("dhtVer"); if (!reader.isLastRead()) return false; @@ -422,7 +442,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse reader.incrementState(); case 12: - futId = reader.readIgniteUuid("futId"); + filterFailedKeys = reader.readCollection("filterFailedKeys", MessageCollectionItemType.MSG); if (!reader.isLastRead()) return false; @@ -430,7 +450,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse reader.incrementState(); case 13: - invalidParts = reader.readCollection("invalidParts", MessageCollectionItemType.INT); + futId = reader.readIgniteUuid("futId"); if (!reader.isLastRead()) return false; @@ -438,7 +458,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse reader.incrementState(); case 14: - miniId = reader.readIgniteUuid("miniId"); + invalidParts = reader.readCollection("invalidParts", MessageCollectionItemType.INT); if (!reader.isLastRead()) return false; @@ -446,7 +466,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse reader.incrementState(); case 15: - ownedValKeys = reader.readCollection("ownedValKeys", MessageCollectionItemType.MSG); + miniId = reader.readIgniteUuid("miniId"); if (!reader.isLastRead()) return false; @@ -454,7 +474,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse reader.incrementState(); case 16: - ownedValVals = reader.readCollection("ownedValVals", MessageCollectionItemType.MSG); + ownedValKeys = reader.readCollection("ownedValKeys", MessageCollectionItemType.MSG); if (!reader.isLastRead()) return false; @@ -462,7 +482,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse reader.incrementState(); case 17: - pending = reader.readCollection("pending", MessageCollectionItemType.MSG); + ownedValVals = reader.readCollection("ownedValVals", MessageCollectionItemType.MSG); if (!reader.isLastRead()) return false; @@ -470,7 +490,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse reader.incrementState(); case 18: - retVal = reader.readMessage("retVal"); + pending = reader.readCollection("pending", MessageCollectionItemType.MSG); if (!reader.isLastRead()) return false; @@ -478,6 +498,14 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse reader.incrementState(); case 19: + retVal = reader.readMessage("retVal"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 20: writeVer = reader.readMessage("writeVer"); if (!reader.isLastRead()) @@ -497,7 +525,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 20; + return 21; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d12dd417/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java index 5f877ec..cb86e0d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java @@ -706,4 +706,9 @@ public interface IgniteInternalTx extends AutoCloseable, GridTimeoutObject { * @return Public API proxy. */ public TransactionProxy proxy(); + + /** + * @param topVer New topology version. + */ + public void onRemap(AffinityTopologyVersion topVer); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d12dd417/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index eb8825e..8cb9cc1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -184,7 +184,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter private AtomicReference<GridFutureAdapter<IgniteInternalTx>> finFut = new AtomicReference<>(); /** Topology version. */ - private AtomicReference<AffinityTopologyVersion> topVer = new AtomicReference<>(AffinityTopologyVersion.NONE); + protected AtomicReference<AffinityTopologyVersion> topVer = new AtomicReference<>(AffinityTopologyVersion.NONE); /** Mutex. */ private final Lock lock = new ReentrantLock(); @@ -493,13 +493,17 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter } /** {@inheritDoc} */ + @Override public void onRemap(AffinityTopologyVersion topVer) { + assert false; + } + + /** {@inheritDoc} */ @Override public boolean hasTransforms() { return transform; } /** {@inheritDoc} */ - @Override - public boolean markPreparing() { + @Override public boolean markPreparing() { return preparing.compareAndSet(false, true); } @@ -1716,6 +1720,11 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter } /** {@inheritDoc} */ + @Override public void onRemap(AffinityTopologyVersion topVer) { + throw new IllegalStateException("Deserialized transaction can only be used as read-only."); + } + + /** {@inheritDoc} */ @Override public boolean empty() { throw new IllegalStateException("Deserialized transaction can only be used as read-only."); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d12dd417/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index f466bf2..f815a73 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -229,14 +229,22 @@ public class IgniteTxHandler { return null; } + IgniteTxEntry firstEntry = null; + try { - for (IgniteTxEntry e : F.concat(false, req.reads(), req.writes())) + for (IgniteTxEntry e : F.concat(false, req.reads(), req.writes())) { e.unmarshal(ctx, false, ctx.deploy().globalLoader()); + + if (firstEntry == null) + firstEntry = e; + } } catch (IgniteCheckedException e) { return new GridFinishedFuture<>(e); } + assert firstEntry != null : req; + GridDhtTxLocal tx; GridCacheVersion mappedVer = ctx.tm().mappedVersion(req.version()); @@ -253,36 +261,87 @@ public class IgniteTxHandler { } } else { - tx = new GridDhtTxLocal( - ctx, - nearNode.id(), - req.version(), - req.futureId(), - req.miniId(), - req.threadId(), - req.implicitSingle(), - req.implicitSingle(), - req.system(), - req.explicitLock(), - req.policy(), - req.concurrency(), - req.isolation(), - req.timeout(), - req.isInvalidate(), - false, - req.txSize(), - req.transactionNodes(), - req.subjectId(), - req.taskNameHash() - ); + GridDhtPartitionTopology top = null; - tx = ctx.tm().onCreated(null, tx); + if (req.firstClientRequest()) { + assert req.concurrency().equals(OPTIMISTIC) : req; - if (tx != null) - tx.topologyVersion(req.topologyVersion()); - else - U.warn(log, "Failed to create local transaction (was transaction rolled back?) [xid=" + - req.version() + ", req=" + req + ']'); + top = firstEntry.context().topology(); + + top.readLock(); + } + + try { + if (top != null && !top.topologyVersion().equals(req.topologyVersion())) { + if (log.isDebugEnabled()) { + log.debug("Client topology version mismatch, need remap transaction [" + + "reqTopVer=" + req.topologyVersion() + + ", locTopVer=" + top.topologyVersion() + + ", req=" + req + ']'); + } + + GridNearTxPrepareResponse res = new GridNearTxPrepareResponse( + req.version(), + req.futureId(), + req.miniId(), + req.version(), + req.version(), + null, + null, + null, + true); + + try { + ctx.io().send(nearNode, res, req.policy()); + } + catch (ClusterTopologyCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to send client tx remap response, client node failed " + + "[node=" + nearNode + ", req=" + req + ']'); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send client tx remap response " + + "[node=" + nearNode + ", req=" + req + ']', e); + } + + return new GridFinishedFuture<>(res); + } + + tx = new GridDhtTxLocal( + ctx, + nearNode.id(), + req.version(), + req.futureId(), + req.miniId(), + req.threadId(), + req.implicitSingle(), + req.implicitSingle(), + req.system(), + req.explicitLock(), + req.policy(), + req.concurrency(), + req.isolation(), + req.timeout(), + req.isInvalidate(), + false, + req.txSize(), + req.transactionNodes(), + req.subjectId(), + req.taskNameHash() + ); + + tx = ctx.tm().onCreated(null, tx); + + if (tx != null) + tx.topologyVersion(req.topologyVersion()); + else + U.warn(log, "Failed to create local transaction (was transaction rolled back?) [xid=" + + req.version() + ", req=" + req + ']'); + } + finally { + if (top != null) + top.readUnlock(); + } } if (tx != null) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d12dd417/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java index f964d39..3b80c2f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java @@ -38,6 +38,7 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; import org.apache.ignite.testframework.*; import org.apache.ignite.testframework.junits.common.*; import org.apache.ignite.transactions.*; +import org.jetbrains.annotations.*; import javax.cache.*; import java.util.*; @@ -228,7 +229,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac final Map<Integer, Integer> map = new HashMap<>(); - for (int i = 0; i < 1; i++) + for (int i = 0; i < 100; i++) map.put(i, i); TestCommunicationSpi spi = (TestCommunicationSpi)ignite2.configuration().getCommunicationSpi(); @@ -252,7 +253,76 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac IgniteEx ignite3 = startGrid(3); - log.info("Stop block1."); + log.info("Stop block."); + + spi.stopBlock(); + + putFut.get(); + + checkData(map, 4); + + map.clear(); + + for (int i = 0; i < 100; i++) + map.put(i, i + 1); + + cache.putAll(map); + + checkData(map, 4); + } + + /** + * @throws Exception If failed. + */ + public void _testPessimisticTx() throws Exception { + ccfg = new CacheConfiguration(); + + ccfg.setCacheMode(PARTITIONED); + ccfg.setBackups(0); + ccfg.setAtomicityMode(TRANSACTIONAL); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setRebalanceMode(SYNC); + + IgniteEx ignite0 = startGrid(0); + IgniteEx ignite1 = startGrid(1); + + client = true; + + final Ignite ignite2 = startGrid(2); + + assertTrue(ignite2.configuration().isClientMode()); + + final Map<Integer, Integer> map = new HashMap<>(); + + for (int i = 0; i < 1; i++) + map.put(i, i); + + TestCommunicationSpi spi = (TestCommunicationSpi)ignite2.configuration().getCommunicationSpi(); + + spi.blockMessages(GridNearLockRequest.class, ignite0.localNode().id()); + spi.blockMessages(GridNearLockRequest.class, ignite1.localNode().id()); + + final IgniteCache<Integer, Integer> cache = ignite2.cache(null); + + IgniteInternalFuture<?> putFut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + try (Transaction tx = ignite2.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.putAll(map); + + tx.commit(); + } + + return null; + } + }); + + assertFalse(putFut.isDone()); + + client = false; + + IgniteEx ignite3 = startGrid(3); + + log.info("Stop block."); spi.stopBlock(); @@ -264,6 +334,99 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac /** * @throws Exception If failed. */ + public void testOptimisticTxMessageClientFirstFlag() throws Exception { + ccfg = new CacheConfiguration(); + + ccfg.setCacheMode(PARTITIONED); + ccfg.setBackups(1); + ccfg.setAtomicityMode(TRANSACTIONAL); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setRebalanceMode(SYNC); + + IgniteEx ignite0 = startGrid(0); + IgniteEx ignite1 = startGrid(1); + IgniteEx ignite2 = startGrid(2); + + client = true; + + Ignite ignite3 = startGrid(3); + + assertTrue(ignite3.configuration().isClientMode()); + + TestCommunicationSpi spi = (TestCommunicationSpi)ignite3.configuration().getCommunicationSpi(); + + IgniteCache<Integer, Integer> cache = ignite3.cache(null); + + List<Integer> keys0 = primaryKeys(ignite0.cache(null), 2, 0); + List<Integer> keys1 = primaryKeys(ignite1.cache(null), 2, 0); + List<Integer> keys2 = primaryKeys(ignite2.cache(null), 2, 0); + + LinkedHashMap<Integer, Integer> map = new LinkedHashMap<>(); + + map.put(keys0.get(0), 1); + map.put(keys1.get(0), 2); + map.put(keys2.get(0), 3); + map.put(keys0.get(1), 4); + map.put(keys1.get(1), 5); + map.put(keys2.get(1), 6); + + spi.record(GridNearTxPrepareRequest.class); + + try (Transaction tx = ignite3.transactions().txStart(OPTIMISTIC, REPEATABLE_READ)) { + for (Map.Entry<Integer, Integer> e : map.entrySet()) + cache.put(e.getKey(), e.getValue()); + + tx.commit(); + } + + checkClientPrepareMessages(spi.recordedMessages(), 6); + + checkData(map, 4); + + cache.putAll(map); + + checkClientPrepareMessages(spi.recordedMessages(), 6); + + spi.record(null); + + checkData(map, 4); + + IgniteCache<Integer, Integer> cache0 = ignite0.cache(null); + + TestCommunicationSpi spi0 = (TestCommunicationSpi)ignite0.configuration().getCommunicationSpi(); + + spi0.record(GridNearTxPrepareRequest.class); + + cache0.putAll(map); + + spi0.record(null); + + List<Object> msgs = spi0.recordedMessages(); + + assertEquals(4, msgs.size()); + + for (Object msg : msgs) + assertFalse(((GridNearTxPrepareRequest)msg).firstClientRequest()); + + checkData(map, 4); + } + + /** + * @param msgs Messages. + * @param expCnt Expected number of messages. + */ + private void checkClientPrepareMessages(List<Object> msgs, int expCnt) { + assertEquals(expCnt, msgs.size()); + + assertTrue(((GridNearTxPrepareRequest)msgs.get(0)).firstClientRequest()); + + for (int i = 1; i < msgs.size(); i++) + assertFalse(((GridNearTxPrepareRequest) msgs.get(i)).firstClientRequest()); + } + + /** + * @throws Exception If failed. + */ public void testLockRemoveAfterClientFailed() throws Exception { ccfg = new CacheConfiguration(); @@ -376,7 +539,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac /** * @throws Exception If failed. */ - public void testPessimisticTxPutAllMultinode() throws Exception { + public void _testPessimisticTxPutAllMultinode() throws Exception { putAllMultinode(null, true); } @@ -579,12 +742,21 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac /** */ private Map<Class<?>, Set<UUID>> blockCls = new HashMap<>(); + /** */ + private Class<?> recordCls; + + /** */ + private List<Object> recordedMsgs = new ArrayList<>(); + /** {@inheritDoc} */ @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException { if (msg instanceof GridIoMessage) { Object msg0 = ((GridIoMessage)msg).message(); synchronized (this) { + if (recordCls != null && msg0.getClass().equals(recordCls)) + recordedMsgs.add(msg0); + Set<UUID> blockNodes = blockCls.get(msg0.getClass()); if (F.contains(blockNodes, node.id())) { @@ -602,6 +774,28 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac } /** + * @param recordCls Message class to record. + */ + void record(@Nullable Class<?> recordCls) { + synchronized (this) { + this.recordCls = recordCls; + } + } + + /** + * @return Recorded messages. + */ + List<Object> recordedMessages() { + synchronized (this) { + List<Object> msgs = recordedMsgs; + + recordedMsgs = new ArrayList<>(); + + return msgs; + } + } + + /** * @param cls Message class. * @param nodeId Node ID. */