# ignite-23 remap for tx updates from client
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/1d413965 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/1d413965 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/1d413965 Branch: refs/heads/ignite-943 Commit: 1d413965d06cd8188df39115701a69761f7ea998 Parents: 12761e4 Author: sboikov <sboi...@gridgain.com> Authored: Thu May 21 11:54:43 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Thu May 21 17:39:30 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheIoManager.java | 8 +- .../GridCachePartitionExchangeManager.java | 44 ++-- .../distributed/GridDistributedTxMapping.java | 17 ++ .../distributed/dht/GridDhtLockFuture.java | 10 +- .../dht/GridDhtTransactionalCacheAdapter.java | 222 ++++++++++++---- .../distributed/dht/GridDhtTxLocalAdapter.java | 8 +- .../distributed/dht/GridDhtTxPrepareFuture.java | 3 +- .../dht/colocated/GridDhtColocatedCache.java | 12 +- .../colocated/GridDhtColocatedLockFuture.java | 184 ++++++++----- .../GridDhtPartitionsExchangeFuture.java | 16 +- .../distributed/near/GridNearLockFuture.java | 259 +++++++++++------- .../distributed/near/GridNearLockMapping.java | 17 ++ .../distributed/near/GridNearLockRequest.java | 68 +++-- .../distributed/near/GridNearLockResponse.java | 48 +++- .../near/GridNearOptimisticTxPrepareFuture.java | 77 ++++-- .../GridNearPessimisticTxPrepareFuture.java | 5 +- .../near/GridNearTransactionalCache.java | 4 +- .../cache/distributed/near/GridNearTxLocal.java | 43 ++- .../near/GridNearTxPrepareRequest.java | 72 +++-- .../near/GridNearTxPrepareResponse.java | 70 +++-- .../cache/transactions/IgniteInternalTx.java | 5 + .../cache/transactions/IgniteTxAdapter.java | 15 +- .../cache/transactions/IgniteTxHandler.java | 118 +++++++-- ...niteCacheClientNodeChangingTopologyTest.java | 263 ++++++++++++++++++- 24 files changed, 1182 insertions(+), 406 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1d413965/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index 02f16c0..eef9fde 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -472,7 +472,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { req.miniId(), false, 0, - req.classError()); + req.classError(), + null); sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy()); } @@ -488,7 +489,10 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { req.miniId(), req.version(), req.version(), - null, null, null); + null, + null, + null, + null); res.error(req.classError()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1d413965/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 25e18db..41a13ba 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -706,9 +706,13 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana GridDhtPartitionsExchangeFuture old = exchFuts.addx( fut = new GridDhtPartitionsExchangeFuture(cctx, busyLock, exchId, reqs)); - if (old != null) + if (old != null) { fut = old; + if (reqs != null) + fut.cacheChangeRequests(reqs); + } + if (discoEvt != null) fut.onEvent(exchId, discoEvt); @@ -870,17 +874,16 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } else { if (msg.client()) { - IgniteInternalFuture<?> fut = affinityReadyFuture(msg.exchangeId().topologyVersion()); - - if (fut != null) { - fut.listen(new CI1<IgniteInternalFuture<?>>() { - @Override public void apply(IgniteInternalFuture<?> fut) { - processSinglePartitionClientUpdate(node, msg); - } - }); - } - else - processSinglePartitionClientUpdate(node, msg); + final GridDhtPartitionsExchangeFuture exchFut = exchangeFuture(msg.exchangeId(), + null, + null); + + exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { + @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) { + // Finished future should reply only to sender client node. + exchFut.onReceive(node.id(), msg); + } + }); } else exchangeFuture(msg.exchangeId(), null, null).onReceive(node.id(), msg); @@ -892,23 +895,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } /** - * @param node Node. - * @param msg Message. - */ - private void processSinglePartitionClientUpdate(final ClusterNode node, final GridDhtPartitionsSingleMessage msg) { - final GridDhtPartitionsExchangeFuture exchFut = exchangeFuture(msg.exchangeId(), - null, - null); - - exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { - @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) { - // Finished future should reply only to sender client node. - exchFut.onReceive(node.id(), msg); - } - }); - } - - /** * @param node Node ID. * @param msg Message. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1d413965/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java index fded3c9..bd1dedf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java @@ -63,6 +63,9 @@ public class GridDistributedTxMapping implements Externalizable { /** {@code True} if mapping is for near caches, {@code false} otherwise. */ private boolean near; + /** {@code True} if this is first mapping for optimistic tx on client node. */ + private boolean clientFirst; + /** * Empty constructor required for {@link Externalizable}. */ @@ -108,6 +111,20 @@ public class GridDistributedTxMapping implements Externalizable { } /** + * @return {@code True} if this is first mapping for optimistic tx on client node. + */ + public boolean clientFirst() { + return clientFirst; + } + + /** + * @param clientFirst {@code True} if this is first mapping for optimistic tx on client node. + */ + public void clientFirst(boolean clientFirst) { + this.clientFirst = clientFirst; + } + + /** * @return {@code True} if mapping is for near caches, {@code false} otherwise. */ public boolean near() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1d413965/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java index c57eded..bdaa552 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java @@ -47,7 +47,7 @@ import static org.apache.ignite.internal.processors.dr.GridDrType.*; /** * Cache lock future. */ -public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Boolean> +public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean> implements GridCacheMvccFuture<Boolean>, GridDhtFuture<Boolean>, GridCacheMappedVersion { /** */ private static final long serialVersionUID = 0L; @@ -60,7 +60,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo /** Cache registry. */ @GridToStringExclude - private GridCacheContext<K, V> cctx; + private GridCacheContext<?, ?> cctx; /** Near node ID. */ private UUID nearNodeId; @@ -151,7 +151,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo * @param skipStore Skip store flag. */ public GridDhtLockFuture( - GridCacheContext<K, V> cctx, + GridCacheContext<?, ?> cctx, UUID nearNodeId, GridCacheVersion nearLockVer, @NotNull AffinityTopologyVersion topVer, @@ -221,7 +221,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo * @param cacheCtx Cache context. * @param invalidPart Partition to retry. */ - void addInvalidPartition(GridCacheContext<K, V> cacheCtx, int invalidPart) { + void addInvalidPartition(GridCacheContext<?, ?> cacheCtx, int invalidPart) { invalidParts.add(invalidPart); // Register invalid partitions with transaction. @@ -1170,7 +1170,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo * @param entries Entries to check. */ @SuppressWarnings({"ForLoopReplaceableByForEach"}) - private void evictReaders(GridCacheContext<K, V> cacheCtx, Collection<IgniteTxKey> keys, UUID nodeId, long msgId, + private void evictReaders(GridCacheContext<?, ?> cacheCtx, Collection<IgniteTxKey> keys, UUID nodeId, long msgId, @Nullable List<GridDhtCacheEntry> entries) { if (entries == null || keys == null || entries.isEmpty() || keys.isEmpty()) return; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1d413965/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java index 26eef50..60e891c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java @@ -518,7 +518,6 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach return; } - // Group lock can be only started from local node, so we never start group lock transaction on remote node. IgniteInternalFuture<?> f = lockAllAsync(ctx, nearNode, req, null); // Register listener just so we print out errors. @@ -534,8 +533,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach private void processDhtLockResponse(UUID nodeId, GridDhtLockResponse res) { assert nodeId != null; assert res != null; - GridDhtLockFuture<K, V> fut = (GridDhtLockFuture<K, V>)ctx.mvcc().<Boolean>future(res.version(), - res.futureId()); + GridDhtLockFuture fut = (GridDhtLockFuture)ctx.mvcc().<Boolean>future(res.version(), res.futureId()); if (fut == null) { if (log.isDebugEnabled()) @@ -604,7 +602,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach assert tx != null; - GridDhtLockFuture<K, V> fut = new GridDhtLockFuture<>( + GridDhtLockFuture fut = new GridDhtLockFuture( ctx, tx.nearNodeId(), tx.nearXidVersion(), @@ -669,7 +667,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach * @return Future. */ public IgniteInternalFuture<GridNearLockResponse> lockAllAsync( - final GridCacheContext<K, V> cacheCtx, + final GridCacheContext<?, ?> cacheCtx, final ClusterNode nearNode, final GridNearLockRequest req, @Nullable final CacheEntryPredicate[] filter0) { @@ -719,26 +717,57 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach if (filter == null) filter = req.filter(); - GridDhtLockFuture<K, V> fut = null; + GridDhtLockFuture fut = null; if (!req.inTx()) { - fut = new GridDhtLockFuture<>(ctx, - nearNode.id(), - req.version(), - req.topologyVersion(), - cnt, - req.txRead(), - req.needReturnValue(), - req.timeout(), - tx, - req.threadId(), - req.accessTtl(), - filter, - req.skipStore()); + GridDhtPartitionTopology top = null; + + if (req.firstClientRequest()) { + assert CU.clientNode(nearNode); + + top = topology(); + + topology().readLock(); + } + + try { + if (top != null && !top.topologyVersion().equals(req.topologyVersion())) { + if (log.isDebugEnabled()) { + log.debug("Client topology version mismatch, need remap lock request [" + + "reqTopVer=" + req.topologyVersion() + + ", locTopVer=" + top.topologyVersion() + + ", req=" + req + ']'); + } + + GridNearLockResponse res = sendClientLockRemapResponse(nearNode, + req, + top.topologyVersion()); + + return new GridFinishedFuture<>(res); + } + + fut = new GridDhtLockFuture(ctx, + nearNode.id(), + req.version(), + req.topologyVersion(), + cnt, + req.txRead(), + req.needReturnValue(), + req.timeout(), + tx, + req.threadId(), + req.accessTtl(), + filter, + req.skipStore()); - // Add before mapping. - if (!ctx.mvcc().addFuture(fut)) - throw new IllegalStateException("Duplicate future ID: " + fut); + // Add before mapping. + if (!ctx.mvcc().addFuture(fut)) + throw new IllegalStateException("Duplicate future ID: " + fut); + } + finally { + if (top != null) + top.readUnlock(); + } } boolean timedout = false; @@ -788,45 +817,76 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach // Handle implicit locks for pessimistic transactions. if (req.inTx()) { if (tx == null) { - tx = new GridDhtTxLocal( - ctx.shared(), - nearNode.id(), - req.version(), - req.futureId(), - req.miniId(), - req.threadId(), - req.implicitTx(), - req.implicitSingleTx(), - ctx.systemTx(), - false, - ctx.ioPolicy(), - PESSIMISTIC, - req.isolation(), - req.timeout(), - req.isInvalidate(), - false, - req.txSize(), - null, - req.subjectId(), - req.taskNameHash()); + GridDhtPartitionTopology top = null; - tx.syncCommit(req.syncCommit()); + if (req.firstClientRequest()) { + assert CU.clientNode(nearNode); - tx = ctx.tm().onCreated(null, tx); + top = topology(); - if (tx == null || !tx.init()) { - String msg = "Failed to acquire lock (transaction has been completed): " + - req.version(); + topology().readLock(); + } - U.warn(log, msg); + try { + if (top != null && !top.topologyVersion().equals(req.topologyVersion())) { + if (log.isDebugEnabled()) { + log.debug("Client topology version mismatch, need remap lock request [" + + "reqTopVer=" + req.topologyVersion() + + ", locTopVer=" + top.topologyVersion() + + ", req=" + req + ']'); + } - if (tx != null) - tx.rollback(); + GridNearLockResponse res = sendClientLockRemapResponse(nearNode, + req, + top.topologyVersion()); - return new GridDhtFinishedFuture<>(new IgniteCheckedException(msg)); - } + return new GridFinishedFuture<>(res); + } - tx.topologyVersion(req.topologyVersion()); + tx = new GridDhtTxLocal( + ctx.shared(), + nearNode.id(), + req.version(), + req.futureId(), + req.miniId(), + req.threadId(), + req.implicitTx(), + req.implicitSingleTx(), + ctx.systemTx(), + false, + ctx.ioPolicy(), + PESSIMISTIC, + req.isolation(), + req.timeout(), + req.isInvalidate(), + false, + req.txSize(), + null, + req.subjectId(), + req.taskNameHash()); + + tx.syncCommit(req.syncCommit()); + + tx = ctx.tm().onCreated(null, tx); + + if (tx == null || !tx.init()) { + String msg = "Failed to acquire lock (transaction has been completed): " + + req.version(); + + U.warn(log, msg); + + if (tx != null) + tx.rollback(); + + return new GridDhtFinishedFuture<>(new IgniteCheckedException(msg)); + } + + tx.topologyVersion(req.topologyVersion()); + } + finally { + if (top != null) + top.readUnlock(); + } } ctx.tm().txContext(tx); @@ -947,6 +1007,42 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach } /** + * @param nearNode Client node. + * @param req Request. + * @param topVer Remap version. + * @return Response. + */ + private GridNearLockResponse sendClientLockRemapResponse(ClusterNode nearNode, + GridNearLockRequest req, + AffinityTopologyVersion topVer) { + assert topVer != null; + + GridNearLockResponse res = new GridNearLockResponse( + ctx.cacheId(), + req.version(), + req.futureId(), + req.miniId(), + false, + 0, + null, + topVer); + + try { + ctx.io().send(nearNode, res, ctx.ioPolicy()); + } + catch (ClusterTopologyCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to send client lock remap response, client node failed " + + "[node=" + nearNode + ", req=" + req + ']'); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send client lock remap response [node=" + nearNode + ", req=" + req + ']', e); + } + + return res; + } + + /** * @param nearNode Near node. * @param entries Entries. * @param req Lock request. @@ -968,7 +1064,13 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach try { // Send reply back to originating near node. GridNearLockResponse res = new GridNearLockResponse(ctx.cacheId(), - req.version(), req.futureId(), req.miniId(), tx != null && tx.onePhaseCommit(), entries.size(), err); + req.version(), + req.futureId(), + req.miniId(), + tx != null && tx.onePhaseCommit(), + entries.size(), + err, + null); if (err == null) { res.pending(localDhtPendingVersions(entries, mappedVer)); @@ -1077,8 +1179,14 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach U.error(log, "Failed to get value for lock reply message for node [node=" + U.toShortString(nearNode) + ", req=" + req + ']', e); - return new GridNearLockResponse(ctx.cacheId(), req.version(), req.futureId(), req.miniId(), false, - entries.size(), e); + return new GridNearLockResponse(ctx.cacheId(), + req.version(), + req.futureId(), + req.miniId(), + false, + entries.size(), + e, + null); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1d413965/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java index 54b59b8..90edb0f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java @@ -52,15 +52,13 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { private static final long serialVersionUID = 0L; /** Near mappings. */ - protected Map<UUID, GridDistributedTxMapping> nearMap = - new ConcurrentHashMap8<>(); + protected Map<UUID, GridDistributedTxMapping> nearMap = new ConcurrentHashMap8<>(); /** DHT mappings. */ - protected Map<UUID, GridDistributedTxMapping> dhtMap = - new ConcurrentHashMap8<>(); + protected Map<UUID, GridDistributedTxMapping> dhtMap = new ConcurrentHashMap8<>(); /** Mapped flag. */ - private AtomicBoolean mapped = new AtomicBoolean(); + protected AtomicBoolean mapped = new AtomicBoolean(); /** */ private long dhtThreadId; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1d413965/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index 293cf95..af0fbdf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -582,7 +582,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter tx.writeVersion(), tx.invalidPartitions(), ret, - prepErr); + prepErr, + null); if (prepErr == null) { addDhtValues(res); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1d413965/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index 05b3c7b..221b230 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -362,13 +362,13 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte @Nullable TransactionIsolation isolation, long accessTtl ) { - assert tx == null || tx instanceof GridNearTxLocal; + assert tx == null || tx instanceof GridNearTxLocal : tx; GridNearTxLocal txx = (GridNearTxLocal)tx; CacheOperationContext opCtx = ctx.operationContextPerCall(); - GridDhtColocatedLockFuture<K, V> fut = new GridDhtColocatedLockFuture<>(ctx, + GridDhtColocatedLockFuture fut = new GridDhtColocatedLockFuture(ctx, keys, txx, isRead, @@ -619,7 +619,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte * @return Lock future. */ IgniteInternalFuture<Exception> lockAllAsync( - final GridCacheContext<K, V> cacheCtx, + final GridCacheContext<?, ?> cacheCtx, @Nullable final GridNearTxLocal tx, final long threadId, final GridCacheVersion ver, @@ -700,7 +700,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte * @return Lock future. */ private IgniteInternalFuture<Exception> lockAllAsync0( - GridCacheContext<K, V> cacheCtx, + GridCacheContext<?, ?> cacheCtx, @Nullable final GridNearTxLocal tx, long threadId, final GridCacheVersion ver, @@ -715,7 +715,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte int cnt = keys.size(); if (tx == null) { - GridDhtLockFuture<K, V> fut = new GridDhtLockFuture<>(ctx, + GridDhtLockFuture fut = new GridDhtLockFuture(ctx, ctx.localNodeId(), ver, topVer, @@ -838,7 +838,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte assert nodeId != null; assert res != null; - GridDhtColocatedLockFuture<K, V> fut = (GridDhtColocatedLockFuture<K, V>)ctx.mvcc(). + GridDhtColocatedLockFuture fut = (GridDhtColocatedLockFuture)ctx.mvcc(). <Boolean>future(res.version(), res.futureId()); if (fut != null) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1d413965/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java index 372c517..a90c6e4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java @@ -46,7 +46,7 @@ import static org.apache.ignite.events.EventType.*; /** * Colocated cache lock future. */ -public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentityFuture<Boolean> +public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture<Boolean> implements GridCacheFuture<Boolean> { /** */ private static final long serialVersionUID = 0L; @@ -59,7 +59,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity /** Cache registry. */ @GridToStringExclude - private GridCacheContext<K, V> cctx; + private GridCacheContext<?, ?> cctx; /** Lock owner thread. */ @GridToStringInclude @@ -121,10 +121,10 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity * @param timeout Lock acquisition timeout. * @param accessTtl TTL for read operation. * @param filter Filter. - * @param skipStore + * @param skipStore Skip store flag. */ public GridDhtColocatedLockFuture( - GridCacheContext<K, V> cctx, + GridCacheContext<?, ?> cctx, Collection<KeyCacheObject> keys, @Nullable GridNearTxLocal tx, boolean read, @@ -550,7 +550,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity // Continue mapping on the same topology version as it was before. this.topVer.compareAndSet(null, topVer); - map(keys); + map(keys, false); markInitialized(); @@ -558,14 +558,16 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity } // Must get topology snapshot and map on that version. - mapOnTopology(); + mapOnTopology(false); } /** * Acquires topology future and checks it completeness under the read lock. If it is not complete, * will asynchronously wait for it's completeness and then try again. + * + * @param remap Remap flag. */ - private void mapOnTopology() { + private void mapOnTopology(final boolean remap) { // We must acquire topology snapshot from the topology version future. cctx.topology().readLock(); @@ -589,19 +591,27 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity AffinityTopologyVersion topVer = fut.topologyVersion(); - if (tx != null) - tx.topologyVersion(topVer); + if (remap) { + if (tx != null) + tx.onRemap(topVer); + + this.topVer.set(topVer); + } + else { + if (tx != null) + tx.topologyVersion(topVer); - this.topVer.compareAndSet(null, topVer); + this.topVer.compareAndSet(null, topVer); + } - map(keys); + map(keys, remap); markInitialized(); } else { fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) { - mapOnTopology(); + mapOnTopology(remap); } }); } @@ -617,8 +627,9 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity * groups belonging to one primary node and locks for these groups are acquired sequentially. * * @param keys Keys. + * @param remap Remap flag. */ - private void map(Collection<KeyCacheObject> keys) { + private void map(Collection<KeyCacheObject> keys, boolean remap) { try { AffinityTopologyVersion topVer = this.topVer.get(); @@ -633,8 +644,12 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity return; } + boolean clientNode = cctx.kernalContext().clientNode(); + + assert !remap || (clientNode && !tx.hasRemoteLocks()); + // First assume this node is primary for all keys passed in. - if (mapAsPrimary(keys, topVer)) + if (!clientNode && mapAsPrimary(keys, topVer)) return; Deque<GridNearLockMapping> mappings = new ConcurrentLinkedDeque8<>(); @@ -642,9 +657,18 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity // Assign keys to primary nodes. GridNearLockMapping map = null; + boolean first = true; + for (KeyCacheObject key : keys) { GridNearLockMapping updated = map(key, map, topVer); + if (first) { + if (clientNode) + updated.clientFirst(tx == null || !tx.hasRemoteLocks()); + + first = false; + } + // If new mapping was created, add to collection. if (updated != map) { mappings.add(updated); @@ -757,7 +781,8 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity inTx() ? tx.subjectId() : null, inTx() ? tx.taskNameHash() : 0, read ? accessTtl : -1L, - skipStore); + skipStore, + mapping.clientFirst()); mapping.request(req); } @@ -815,7 +840,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity if (hasRmtNodes) { trackable = true; - if (!cctx.mvcc().addFuture(this)) + if (!remap && !cctx.mvcc().addFuture(this)) throw new IllegalStateException("Duplicate future ID: " + this); } else @@ -1249,75 +1274,104 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity return; } - int i = 0; + if (res.clientRemapVersion() != null) { + assert cctx.kernalContext().clientNode(); + + IgniteInternalFuture<?> affFut = + cctx.shared().exchange().affinityReadyFuture(res.clientRemapVersion()); + + if (affFut != null && !affFut.isDone()) { + affFut.listen(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> fut) { + remap(); + } + }); + } + else + remap(); + } + else { + int i = 0; - for (KeyCacheObject k : keys) { - IgniteBiTuple<GridCacheVersion, CacheObject> oldValTup = valMap.get(k); + for (KeyCacheObject k : keys) { + IgniteBiTuple<GridCacheVersion, CacheObject> oldValTup = valMap.get(k); - CacheObject newVal = res.value(i); + CacheObject newVal = res.value(i); - GridCacheVersion dhtVer = res.dhtVersion(i); + GridCacheVersion dhtVer = res.dhtVersion(i); - if (newVal == null) { - if (oldValTup != null) { - if (oldValTup.get1().equals(dhtVer)) - newVal = oldValTup.get2(); + if (newVal == null) { + if (oldValTup != null) { + if (oldValTup.get1().equals(dhtVer)) + newVal = oldValTup.get2(); + } } - } - if (inTx()) { - IgniteTxEntry txEntry = tx.entry(cctx.txKey(k)); + if (inTx()) { + IgniteTxEntry txEntry = tx.entry(cctx.txKey(k)); - // In colocated cache we must receive responses only for detached entries. - assert txEntry.cached().detached(); + // In colocated cache we must receive responses only for detached entries. + assert txEntry.cached().detached() : txEntry; - txEntry.markLocked(); + txEntry.markLocked(); - GridDhtDetachedCacheEntry entry = (GridDhtDetachedCacheEntry)txEntry.cached(); + GridDhtDetachedCacheEntry entry = (GridDhtDetachedCacheEntry)txEntry.cached(); - if (res.dhtVersion(i) == null) { - onDone(new IgniteCheckedException("Failed to receive DHT version from remote node " + - "(will fail the lock): " + res)); + if (res.dhtVersion(i) == null) { + onDone(new IgniteCheckedException("Failed to receive DHT version from remote node " + + "(will fail the lock): " + res)); - return; + return; + } + + // Set value to detached entry. + entry.resetFromPrimary(newVal, dhtVer); + + tx.hasRemoteLocks(true); + + if (log.isDebugEnabled()) + log.debug("Processed response for entry [res=" + res + ", entry=" + entry + ']'); + } + else + cctx.mvcc().markExplicitOwner(k, threadId); + + if (retval && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) { + cctx.events().addEvent(cctx.affinity().partition(k), + k, + tx, + null, + EVT_CACHE_OBJECT_READ, + newVal, + newVal != null, + null, + false, + CU.subjectId(tx, cctx.shared()), + null, + tx == null ? null : tx.resolveTaskName()); } - // Set value to detached entry. - entry.resetFromPrimary(newVal, dhtVer); + i++; + } - if (log.isDebugEnabled()) - log.debug("Processed response for entry [res=" + res + ", entry=" + entry + ']'); + try { + proceedMapping(mappings); } - else - cctx.mvcc().markExplicitOwner(k, threadId); - - if (retval && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) { - cctx.events().addEvent(cctx.affinity().partition(k), - k, - tx, - null, - EVT_CACHE_OBJECT_READ, - newVal, - newVal != null, - null, - false, - CU.subjectId(tx, cctx.shared()), - null, - tx == null ? null : tx.resolveTaskName()); + catch (IgniteCheckedException e) { + onDone(e); } - i++; + onDone(true); } + } + } - try { - proceedMapping(mappings); - } - catch (IgniteCheckedException e) { - onDone(e); - } + /** + * + */ + private void remap() { + mapOnTopology(true); - onDone(true); - } + onDone(true); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1d413965/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 94ca540..af7fa5c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -229,15 +229,17 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT initFut = new GridFutureAdapter<>(); - // Grab all nodes with order of equal or less than last joined node. - ClusterNode node = CU.oldestAliveCacheServerNode(cctx, exchId.topologyVersion()); - - oldestNode.set(node); - if (log.isDebugEnabled()) log.debug("Creating exchange future [localNode=" + cctx.localNodeId() + ", fut=" + this + ']'); } + /** + * @param reqs Cache change requests. + */ + public void cacheChangeRequests(Collection<DynamicCacheChangeRequest> reqs) { + this.reqs = reqs; + } + /** {@inheritDoc} */ @Override public GridDiscoveryTopologySnapshot topologySnapshot() throws IgniteCheckedException { get(); @@ -461,6 +463,10 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT assert discoEvt != null : this; assert !dummy && !forcePreload : this; + ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, exchId.topologyVersion()); + + oldestNode.set(oldest); + startCaches(); // True if client node joined or failed. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1d413965/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java index 0ffb4e5..92498f0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java @@ -45,7 +45,7 @@ import static org.apache.ignite.events.EventType.*; /** * Cache lock future. */ -public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<Boolean> +public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean> implements GridCacheMvccFuture<Boolean> { /** */ private static final long serialVersionUID = 0L; @@ -58,7 +58,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B /** Cache registry. */ @GridToStringExclude - private GridCacheContext<K, V> cctx; + private GridCacheContext<?, ?> cctx; /** Lock owner thread. */ @GridToStringInclude @@ -135,7 +135,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B * @param skipStore skipStore */ public GridNearLockFuture( - GridCacheContext<K, V> cctx, + GridCacheContext<?, ?> cctx, Collection<KeyCacheObject> keys, @Nullable GridNearTxLocal tx, boolean read, @@ -184,15 +184,14 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B * @return Participating nodes. */ @Override public Collection<? extends ClusterNode> nodes() { - return - F.viewReadOnly(futures(), new IgniteClosure<IgniteInternalFuture<?>, ClusterNode>() { - @Nullable @Override public ClusterNode apply(IgniteInternalFuture<?> f) { - if (isMini(f)) - return ((MiniFuture)f).node(); + return F.viewReadOnly(futures(), new IgniteClosure<IgniteInternalFuture<?>, ClusterNode>() { + @Nullable @Override public ClusterNode apply(IgniteInternalFuture<?> f) { + if (isMini(f)) + return ((MiniFuture)f).node(); - return cctx.discovery().localNode(); - } - }); + return cctx.discovery().localNode(); + } + }); } /** {@inheritDoc} */ @@ -682,7 +681,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B // Continue mapping on the same topology version as it was before. this.topVer.compareAndSet(null, topVer); - map(keys); + map(keys, false); markInitialized(); @@ -690,14 +689,16 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B } // Must get topology snapshot and map on that version. - mapOnTopology(); + mapOnTopology(false); } /** * Acquires topology future and checks it completeness under the read lock. If it is not complete, * will asynchronously wait for it's completeness and then try again. + * + * @param remap Remap flag. */ - void mapOnTopology() { + void mapOnTopology(final boolean remap) { // We must acquire topology snapshot from the topology version future. cctx.topology().readLock(); @@ -721,19 +722,27 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B AffinityTopologyVersion topVer = fut.topologyVersion(); - if (tx != null) - tx.topologyVersion(topVer); + if (remap) { + if (tx != null) + tx.onRemap(topVer); - this.topVer.compareAndSet(null, topVer); + this.topVer.set(topVer); + } + else { + if (tx != null) + tx.topologyVersion(topVer); + + this.topVer.compareAndSet(null, topVer); + } - map(keys); + map(keys, remap); markInitialized(); } else { fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) { - mapOnTopology(); + mapOnTopology(remap); } }); } @@ -749,14 +758,15 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B * groups belonging to one primary node and locks for these groups are acquired sequentially. * * @param keys Keys. + * @param remap Remap flag. */ - private void map(Iterable<KeyCacheObject> keys) { + private void map(Iterable<KeyCacheObject> keys, boolean remap) { try { AffinityTopologyVersion topVer = this.topVer.get(); assert topVer != null; - assert topVer.topologyVersion() > 0; + assert topVer.topologyVersion() > 0 : topVer; if (CU.affinityNodes(cctx, topVer).isEmpty()) { onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for near-only cache (all " + @@ -765,15 +775,27 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B return; } - ConcurrentLinkedDeque8<GridNearLockMapping> mappings = - new ConcurrentLinkedDeque8<>(); + boolean clientNode = cctx.kernalContext().clientNode(); + + assert !remap || (clientNode && !tx.hasRemoteLocks()); + + ConcurrentLinkedDeque8<GridNearLockMapping> mappings = new ConcurrentLinkedDeque8<>(); // Assign keys to primary nodes. GridNearLockMapping map = null; + boolean first = true; + for (KeyCacheObject key : keys) { GridNearLockMapping updated = map(key, map, topVer); + if (first) { + if (clientNode) + updated.clientFirst(tx == null || !tx.hasRemoteLocks()); + + first = false; + } + // If new mapping was created, add to collection. if (updated != map) { mappings.add(updated); @@ -893,7 +915,8 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B inTx() ? tx.subjectId() : null, inTx() ? tx.taskNameHash() : 0, read ? accessTtl : -1L, - skipStore); + skipStore, + mapping.clientFirst()); mapping.request(req); } @@ -1197,7 +1220,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B /** * @return DHT cache. */ - private GridDhtTransactionalCacheAdapter<K, V> dht() { + private GridDhtTransactionalCacheAdapter<?, ?> dht() { return cctx.nearTx().dht(); } @@ -1356,110 +1379,144 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B return; } - int i = 0; + if (res.clientRemapVersion() != null) { + assert cctx.kernalContext().clientNode(); - AffinityTopologyVersion topVer = GridNearLockFuture.this.topVer.get(); + IgniteInternalFuture<?> affFut = + cctx.shared().exchange().affinityReadyFuture(res.clientRemapVersion()); - for (KeyCacheObject k : keys) { - while (true) { - GridNearCacheEntry entry = cctx.near().entryExx(k, topVer); + if (affFut != null && !affFut.isDone()) { + affFut.listen(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> fut) { + remap(); + } + }); + } + else + remap(); + } + else { + int i = 0; - try { - if (res.dhtVersion(i) == null) { - onDone(new IgniteCheckedException("Failed to receive DHT version from remote node " + - "(will fail the lock): " + res)); + AffinityTopologyVersion topVer = GridNearLockFuture.this.topVer.get(); - return; - } + for (KeyCacheObject k : keys) { + while (true) { + GridNearCacheEntry entry = cctx.near().entryExx(k, topVer); - IgniteBiTuple<GridCacheVersion, CacheObject> oldValTup = valMap.get(entry.key()); + try { + if (res.dhtVersion(i) == null) { + onDone(new IgniteCheckedException("Failed to receive DHT version from remote node " + + "(will fail the lock): " + res)); - CacheObject oldVal = entry.rawGet(); - boolean hasOldVal = false; - CacheObject newVal = res.value(i); + return; + } - boolean readRecordable = false; + IgniteBiTuple<GridCacheVersion, CacheObject> oldValTup = valMap.get(entry.key()); - if (retval) { - readRecordable = cctx.events().isRecordable(EVT_CACHE_OBJECT_READ); + CacheObject oldVal = entry.rawGet(); + boolean hasOldVal = false; + CacheObject newVal = res.value(i); - if (readRecordable) - hasOldVal = entry.hasValue(); - } + boolean readRecordable = false; - GridCacheVersion dhtVer = res.dhtVersion(i); - GridCacheVersion mappedVer = res.mappedVersion(i); + if (retval) { + readRecordable = cctx.events().isRecordable(EVT_CACHE_OBJECT_READ); + + if (readRecordable) + hasOldVal = entry.hasValue(); + } - if (newVal == null) { - if (oldValTup != null) { - if (oldValTup.get1().equals(dhtVer)) - newVal = oldValTup.get2(); + GridCacheVersion dhtVer = res.dhtVersion(i); + GridCacheVersion mappedVer = res.mappedVersion(i); - oldVal = oldValTup.get2(); + if (newVal == null) { + if (oldValTup != null) { + if (oldValTup.get1().equals(dhtVer)) + newVal = oldValTup.get2(); + + oldVal = oldValTup.get2(); + } } - } - // Lock is held at this point, so we can set the - // returned value if any. - entry.resetFromPrimary(newVal, lockVer, dhtVer, node.id(), topVer); + // Lock is held at this point, so we can set the + // returned value if any. + entry.resetFromPrimary(newVal, lockVer, dhtVer, node.id(), topVer); - if (inTx() && implicitTx() && tx.onePhaseCommit()) { - boolean pass = res.filterResult(i); + if (inTx()) { + tx.hasRemoteLocks(true); - tx.entry(cctx.txKey(k)).filters(pass ? CU.empty0() : CU.alwaysFalse0Arr()); - } + if (implicitTx() && tx.onePhaseCommit()) { + boolean pass = res.filterResult(i); - entry.readyNearLock(lockVer, mappedVer, res.committedVersions(), res.rolledbackVersions(), - res.pending()); - - if (retval) { - if (readRecordable) - cctx.events().addEvent( - entry.partition(), - entry.key(), - tx, - null, - EVT_CACHE_OBJECT_READ, - newVal, - newVal != null, - oldVal, - hasOldVal, - CU.subjectId(tx, cctx.shared()), - null, - inTx() ? tx.resolveTaskName() : null); - - if (cctx.cache().configuration().isStatisticsEnabled()) - cctx.cache().metrics0().onRead(false); - } + tx.entry(cctx.txKey(k)).filters(pass ? CU.empty0() : CU.alwaysFalse0Arr()); + } + } - if (log.isDebugEnabled()) - log.debug("Processed response for entry [res=" + res + ", entry=" + entry + ']'); + entry.readyNearLock(lockVer, + mappedVer, + res.committedVersions(), + res.rolledbackVersions(), + res.pending()); + + if (retval) { + if (readRecordable) + cctx.events().addEvent( + entry.partition(), + entry.key(), + tx, + null, + EVT_CACHE_OBJECT_READ, + newVal, + newVal != null, + oldVal, + hasOldVal, + CU.subjectId(tx, cctx.shared()), + null, + inTx() ? tx.resolveTaskName() : null); + + if (cctx.cache().configuration().isStatisticsEnabled()) + cctx.cache().metrics0().onRead(false); + } - break; // Inner while loop. - } - catch (GridCacheEntryRemovedException ignored) { - if (log.isDebugEnabled()) - log.debug("Failed to add candidates because entry was removed (will renew)."); + if (log.isDebugEnabled()) + log.debug("Processed response for entry [res=" + res + ", entry=" + entry + ']'); - // Replace old entry with new one. - entries.set(i, (GridDistributedCacheEntry)cctx.cache().entryEx(entry.key())); + break; // Inner while loop. + } + catch (GridCacheEntryRemovedException ignored) { + if (log.isDebugEnabled()) + log.debug("Failed to add candidates because entry was removed (will renew)."); + + // Replace old entry with new one. + entries.set(i, (GridDistributedCacheEntry)cctx.cache().entryEx(entry.key())); + } } + + i++; } - i++; - } + try { + proceedMapping(mappings); + } + catch (IgniteCheckedException e) { + onDone(e); + } - try { - proceedMapping(mappings); - } - catch (IgniteCheckedException e) { - onDone(e); + onDone(true); } - - onDone(true); } } + /** + * + */ + private void remap() { + mapOnTopology(true); + + onDone(true); + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(MiniFuture.class, this, "node", node.id(), "super", super.toString()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1d413965/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockMapping.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockMapping.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockMapping.java index 51000ef..3ea5b7c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockMapping.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockMapping.java @@ -44,6 +44,9 @@ public class GridNearLockMapping { @GridToStringInclude private Collection<KeyCacheObject> distributedKeys; + /** {@code True} if this is first mapping for lock operation on client node. */ + private boolean clientFirst; + /** * Creates near lock mapping for specified node and key. * @@ -60,6 +63,20 @@ public class GridNearLockMapping { } /** + * @return {@code True} if this is first mapping for lock operation on client node. + */ + public boolean clientFirst() { + return clientFirst; + } + + /** + * @param clientFirst {@code True} if this is first mapping for lock operation on client node. + */ + public void clientFirst(boolean clientFirst) { + this.clientFirst = clientFirst; + } + + /** * @return Node to which keys are mapped. */ public ClusterNode node() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1d413965/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java index e71dd65..81184a9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java @@ -80,6 +80,9 @@ public class GridNearLockRequest extends GridDistributedLockRequest { /** Flag indicating whether cache operation requires a previous value. */ private boolean retVal; + /** {@code True} if first lock request for lock operation sent from client node. */ + private boolean firstClientReq; + /** * Empty constructor required for {@link Externalizable}. */ @@ -98,6 +101,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest { * @param implicitTx Flag to indicate that transaction is implicit. * @param implicitSingleTx Implicit-transaction-with-one-key flag. * @param isRead Indicates whether implicit lock is for read or write operation. + * @param retVal Return value flag. * @param isolation Transaction isolation. * @param isInvalidate Invalidation flag. * @param timeout Lock timeout. @@ -108,6 +112,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest { * @param taskNameHash Task name hash code. * @param accessTtl TTL for read operation. * @param skipStore Skip store flag. + * @param firstClientReq {@code True} if first lock request for lock operation sent from client node. */ public GridNearLockRequest( int cacheId, @@ -130,7 +135,8 @@ public class GridNearLockRequest extends GridDistributedLockRequest { @Nullable UUID subjId, int taskNameHash, long accessTtl, - boolean skipStore + boolean skipStore, + boolean firstClientReq ) { super( cacheId, @@ -158,11 +164,19 @@ public class GridNearLockRequest extends GridDistributedLockRequest { this.taskNameHash = taskNameHash; this.accessTtl = accessTtl; this.retVal = retVal; + this.firstClientReq = firstClientReq; dhtVers = new GridCacheVersion[keyCnt]; } /** + * @return {@code True} if first lock request for lock operation sent from client node. + */ + public boolean firstClientRequest() { + return firstClientReq; + } + + /** * @return Topology version. */ @Override public AffinityTopologyVersion topologyVersion() { @@ -368,60 +382,66 @@ public class GridNearLockRequest extends GridDistributedLockRequest { writer.incrementState(); case 24: - if (!writer.writeBoolean("hasTransforms", hasTransforms)) + if (!writer.writeBoolean("firstClientReq", firstClientReq)) return false; writer.incrementState(); case 25: - if (!writer.writeBoolean("implicitSingleTx", implicitSingleTx)) + if (!writer.writeBoolean("hasTransforms", hasTransforms)) return false; writer.incrementState(); case 26: - if (!writer.writeBoolean("implicitTx", implicitTx)) + if (!writer.writeBoolean("implicitSingleTx", implicitSingleTx)) return false; writer.incrementState(); case 27: - if (!writer.writeIgniteUuid("miniId", miniId)) + if (!writer.writeBoolean("implicitTx", implicitTx)) return false; writer.incrementState(); case 28: - if (!writer.writeBoolean("onePhaseCommit", onePhaseCommit)) + if (!writer.writeIgniteUuid("miniId", miniId)) return false; writer.incrementState(); case 29: - if (!writer.writeBoolean("retVal", retVal)) + if (!writer.writeBoolean("onePhaseCommit", onePhaseCommit)) return false; writer.incrementState(); case 30: - if (!writer.writeUuid("subjId", subjId)) + if (!writer.writeBoolean("retVal", retVal)) return false; writer.incrementState(); case 31: - if (!writer.writeBoolean("syncCommit", syncCommit)) + if (!writer.writeUuid("subjId", subjId)) return false; writer.incrementState(); case 32: - if (!writer.writeInt("taskNameHash", taskNameHash)) + if (!writer.writeBoolean("syncCommit", syncCommit)) return false; writer.incrementState(); case 33: + if (!writer.writeInt("taskNameHash", taskNameHash)) + return false; + + writer.incrementState(); + + case 34: if (!writer.writeMessage("topVer", topVer)) return false; @@ -468,7 +488,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest { reader.incrementState(); case 24: - hasTransforms = reader.readBoolean("hasTransforms"); + firstClientReq = reader.readBoolean("firstClientReq"); if (!reader.isLastRead()) return false; @@ -476,7 +496,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest { reader.incrementState(); case 25: - implicitSingleTx = reader.readBoolean("implicitSingleTx"); + hasTransforms = reader.readBoolean("hasTransforms"); if (!reader.isLastRead()) return false; @@ -484,7 +504,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest { reader.incrementState(); case 26: - implicitTx = reader.readBoolean("implicitTx"); + implicitSingleTx = reader.readBoolean("implicitSingleTx"); if (!reader.isLastRead()) return false; @@ -492,7 +512,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest { reader.incrementState(); case 27: - miniId = reader.readIgniteUuid("miniId"); + implicitTx = reader.readBoolean("implicitTx"); if (!reader.isLastRead()) return false; @@ -500,7 +520,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest { reader.incrementState(); case 28: - onePhaseCommit = reader.readBoolean("onePhaseCommit"); + miniId = reader.readIgniteUuid("miniId"); if (!reader.isLastRead()) return false; @@ -508,7 +528,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest { reader.incrementState(); case 29: - retVal = reader.readBoolean("retVal"); + onePhaseCommit = reader.readBoolean("onePhaseCommit"); if (!reader.isLastRead()) return false; @@ -516,7 +536,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest { reader.incrementState(); case 30: - subjId = reader.readUuid("subjId"); + retVal = reader.readBoolean("retVal"); if (!reader.isLastRead()) return false; @@ -524,7 +544,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest { reader.incrementState(); case 31: - syncCommit = reader.readBoolean("syncCommit"); + subjId = reader.readUuid("subjId"); if (!reader.isLastRead()) return false; @@ -532,7 +552,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest { reader.incrementState(); case 32: - taskNameHash = reader.readInt("taskNameHash"); + syncCommit = reader.readBoolean("syncCommit"); if (!reader.isLastRead()) return false; @@ -540,6 +560,14 @@ public class GridNearLockRequest extends GridDistributedLockRequest { reader.incrementState(); case 33: + taskNameHash = reader.readInt("taskNameHash"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 34: topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) @@ -559,7 +587,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 34; + return 35; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1d413965/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockResponse.java index 20928de..f324198 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockResponse.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.near; import org.apache.ignite.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.*; import org.apache.ignite.internal.processors.cache.version.*; @@ -58,6 +59,9 @@ public class GridNearLockResponse extends GridDistributedLockResponse { /** Filter evaluation results for fast-commit transactions. */ private boolean[] filterRes; + /** {@code True} if client node should remap lock request. */ + private AffinityTopologyVersion clientRemapVer; + /** * Empty constructor (required by {@link Externalizable}). */ @@ -73,6 +77,7 @@ public class GridNearLockResponse extends GridDistributedLockResponse { * @param filterRes {@code True} if need to allocate array for filter evaluation results. * @param cnt Count. * @param err Error. + * @param clientRemapVer {@code True} if client node should remap lock request. */ public GridNearLockResponse( int cacheId, @@ -81,13 +86,15 @@ public class GridNearLockResponse extends GridDistributedLockResponse { IgniteUuid miniId, boolean filterRes, int cnt, - Throwable err + Throwable err, + AffinityTopologyVersion clientRemapVer ) { super(cacheId, lockVer, futId, cnt, err); assert miniId != null; this.miniId = miniId; + this.clientRemapVer = clientRemapVer; dhtVers = new GridCacheVersion[cnt]; mappedVers = new GridCacheVersion[cnt]; @@ -97,6 +104,13 @@ public class GridNearLockResponse extends GridDistributedLockResponse { } /** + * @return {@code True} if client node should remap lock request. + */ + @Nullable public AffinityTopologyVersion clientRemapVersion() { + return clientRemapVer; + } + + /** * Gets pending versions that are less than {@link #version()}. * * @return Pending versions. @@ -192,30 +206,36 @@ public class GridNearLockResponse extends GridDistributedLockResponse { switch (writer.state()) { case 11: - if (!writer.writeObjectArray("dhtVers", dhtVers, MessageCollectionItemType.MSG)) + if (!writer.writeMessage("clientRemapVer", clientRemapVer)) return false; writer.incrementState(); case 12: - if (!writer.writeBooleanArray("filterRes", filterRes)) + if (!writer.writeObjectArray("dhtVers", dhtVers, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 13: - if (!writer.writeObjectArray("mappedVers", mappedVers, MessageCollectionItemType.MSG)) + if (!writer.writeBooleanArray("filterRes", filterRes)) return false; writer.incrementState(); case 14: - if (!writer.writeIgniteUuid("miniId", miniId)) + if (!writer.writeObjectArray("mappedVers", mappedVers, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 15: + if (!writer.writeIgniteUuid("miniId", miniId)) + return false; + + writer.incrementState(); + + case 16: if (!writer.writeCollection("pending", pending, MessageCollectionItemType.MSG)) return false; @@ -238,7 +258,7 @@ public class GridNearLockResponse extends GridDistributedLockResponse { switch (reader.state()) { case 11: - dhtVers = reader.readObjectArray("dhtVers", MessageCollectionItemType.MSG, GridCacheVersion.class); + clientRemapVer = reader.readMessage("clientRemapVer"); if (!reader.isLastRead()) return false; @@ -246,7 +266,7 @@ public class GridNearLockResponse extends GridDistributedLockResponse { reader.incrementState(); case 12: - filterRes = reader.readBooleanArray("filterRes"); + dhtVers = reader.readObjectArray("dhtVers", MessageCollectionItemType.MSG, GridCacheVersion.class); if (!reader.isLastRead()) return false; @@ -254,7 +274,7 @@ public class GridNearLockResponse extends GridDistributedLockResponse { reader.incrementState(); case 13: - mappedVers = reader.readObjectArray("mappedVers", MessageCollectionItemType.MSG, GridCacheVersion.class); + filterRes = reader.readBooleanArray("filterRes"); if (!reader.isLastRead()) return false; @@ -262,7 +282,7 @@ public class GridNearLockResponse extends GridDistributedLockResponse { reader.incrementState(); case 14: - miniId = reader.readIgniteUuid("miniId"); + mappedVers = reader.readObjectArray("mappedVers", MessageCollectionItemType.MSG, GridCacheVersion.class); if (!reader.isLastRead()) return false; @@ -270,6 +290,14 @@ public class GridNearLockResponse extends GridDistributedLockResponse { reader.incrementState(); case 15: + miniId = reader.readIgniteUuid("miniId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 16: pending = reader.readCollection("pending", MessageCollectionItemType.MSG); if (!reader.isLastRead()) @@ -289,7 +317,7 @@ public class GridNearLockResponse extends GridDistributedLockResponse { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 16; + return 17; } /** {@inheritDoc} */