http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java index 10b84e2..adea9e0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java @@ -54,7 +54,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap private GridDhtPartitionTopology top; /** Preloader. */ - protected GridCachePreloader<K, V> preldr; + protected GridCachePreloader preldr; /** Multi tx future holder. */ private ThreadLocal<IgniteBiTuple<IgniteUuid, GridDhtTopologyFuture>> multiTxHolder = new ThreadLocal<>(); @@ -75,7 +75,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap protected GridDhtCacheAdapter(GridCacheContext<K, V> ctx) { super(ctx, ctx.config().getStartSize()); - top = new GridDhtPartitionTopologyImpl<>(ctx); + top = new GridDhtPartitionTopologyImpl(ctx); } /** @@ -87,7 +87,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap protected GridDhtCacheAdapter(GridCacheContext<K, V> ctx, GridCacheConcurrentMap map) { super(ctx, map); - top = new GridDhtPartitionTopologyImpl<>(ctx); + top = new GridDhtPartitionTopologyImpl(ctx); } /** {@inheritDoc} */ @@ -168,17 +168,17 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap } /** {@inheritDoc} */ - @Override public GridCachePreloader<K, V> preloader() { + @Override public GridCachePreloader preloader() { return preldr; } /** * @return DHT preloader. */ - public GridDhtPreloader<K, V> dhtPreloader() { + public GridDhtPreloader dhtPreloader() { assert preldr instanceof GridDhtPreloader; - return (GridDhtPreloader<K, V>)preldr; + return (GridDhtPreloader)preldr; } /** @@ -932,6 +932,21 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap } /** + * @param expVer Expected topology version. + * @param curVer Current topology version. + * @return {@code True} if cache affinity changed and operation should be remapped. + */ + protected final boolean needRemap(AffinityTopologyVersion expVer, AffinityTopologyVersion curVer) { + if (expVer.equals(curVer)) + return false; + + Collection<ClusterNode> cacheNodes0 = ctx.discovery().cacheAffinityNodes(ctx.name(), expVer); + Collection<ClusterNode> cacheNodes1 = ctx.discovery().cacheAffinityNodes(ctx.name(), curVer); + + return !cacheNodes0.equals(cacheNodes1); + } + + /** * @param primary If {@code true} includes primary entries. * @param backup If {@code true} includes backup entries. * @return Local entries iterator.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java index c9a7af8..89b85c4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java @@ -292,12 +292,8 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry { return ret; } - /** - * Calls {@link GridDhtLocalPartition#onUnlock()} for this entry's partition. - */ + /** {@inheritDoc} */ @Override public void onUnlock() { - super.onUnlock(); - locPart.onUnlock(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java index f6f930e..742fbfe 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java @@ -295,6 +295,11 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col if (info == null) continue; + boolean addReader = (!e.deleted() && k.getValue() && !skipVals); + + if (addReader) + e.unswap(false); + // Register reader. If there are active transactions for this entry, // then will wait for their completion before proceeding. // TODO: GG-4003: @@ -303,8 +308,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col // TODO: To fix, check that reader is contained in the list of readers once // TODO: again after the returned future completes - if not, try again. // TODO: Also, why is info read before transactions are complete, and not after? - IgniteInternalFuture<Boolean> f = (!e.deleted() && k.getValue() && !skipVals) ? - e.addReader(reader, msgId, topVer) : null; + IgniteInternalFuture<Boolean> f = addReader ? e.addReader(reader, msgId, topVer) : null; if (f != null) { if (txFut == null) @@ -317,6 +321,9 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col break; } + catch (IgniteCheckedException err) { + return new GridFinishedFuture<>(err); + } catch (GridCacheEntryRemovedException ignore) { if (log.isDebugEnabled()) log.debug("Got removed entry when getting a DHT value: " + e); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java index c57eded..bdaa552 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java @@ -47,7 +47,7 @@ import static org.apache.ignite.internal.processors.dr.GridDrType.*; /** * Cache lock future. */ -public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Boolean> +public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean> implements GridCacheMvccFuture<Boolean>, GridDhtFuture<Boolean>, GridCacheMappedVersion { /** */ private static final long serialVersionUID = 0L; @@ -60,7 +60,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo /** Cache registry. */ @GridToStringExclude - private GridCacheContext<K, V> cctx; + private GridCacheContext<?, ?> cctx; /** Near node ID. */ private UUID nearNodeId; @@ -151,7 +151,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo * @param skipStore Skip store flag. */ public GridDhtLockFuture( - GridCacheContext<K, V> cctx, + GridCacheContext<?, ?> cctx, UUID nearNodeId, GridCacheVersion nearLockVer, @NotNull AffinityTopologyVersion topVer, @@ -221,7 +221,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo * @param cacheCtx Cache context. * @param invalidPart Partition to retry. */ - void addInvalidPartition(GridCacheContext<K, V> cacheCtx, int invalidPart) { + void addInvalidPartition(GridCacheContext<?, ?> cacheCtx, int invalidPart) { invalidParts.add(invalidPart); // Register invalid partitions with transaction. @@ -1170,7 +1170,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo * @param entries Entries to check. */ @SuppressWarnings({"ForLoopReplaceableByForEach"}) - private void evictReaders(GridCacheContext<K, V> cacheCtx, Collection<IgniteTxKey> keys, UUID nodeId, long msgId, + private void evictReaders(GridCacheContext<?, ?> cacheCtx, Collection<IgniteTxKey> keys, UUID nodeId, long msgId, @Nullable List<GridDhtCacheEntry> entries) { if (entries == null || keys == null || entries.isEmpty() || keys.isEmpty()) return; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 073e0e7..374ab87 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -41,7 +41,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh * Partition topology. */ @GridToStringExclude -class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology { +class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { /** If true, then check consistency. */ private static final boolean CONSISTENCY_CHECK = false; @@ -49,7 +49,7 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology { private static final boolean FULL_MAP_DEBUG = false; /** Context. */ - private final GridCacheContext<K, V> cctx; + private final GridCacheContext<?, ?> cctx; /** Logger. */ private final IgniteLogger log; @@ -85,7 +85,7 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology { /** * @param cctx Context. */ - GridDhtPartitionTopologyImpl(GridCacheContext<K, V> cctx) { + GridDhtPartitionTopologyImpl(GridCacheContext<?, ?> cctx) { assert cctx != null; this.cctx = cctx; @@ -239,7 +239,9 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology { removeNode(exchId.nodeId()); // In case if node joins, get topology at the time of joining node. - ClusterNode oldest = CU.oldest(cctx.shared(), topVer); + ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), topVer); + + assert oldest != null; if (log.isDebugEnabled()) log.debug("Partition map beforeExchange [exchId=" + exchId + ", fullMap=" + fullMapString() + ']'); @@ -247,7 +249,7 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology { long updateSeq = this.updateSeq.incrementAndGet(); // If this is the oldest node. - if (oldest.id().equals(loc.id()) || exchFut.isCacheAdded(cctx.cacheId())) { + if (oldest.id().equals(loc.id()) || exchFut.isCacheAdded(cctx.cacheId(), exchId.topologyVersion())) { if (node2part == null) { node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq); @@ -274,7 +276,7 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology { if (cctx.rebalanceEnabled()) { for (int p = 0; p < num; p++) { // If this is the first node in grid. - boolean added = exchFut.isCacheAdded(cctx.cacheId()); + boolean added = exchFut.isCacheAdded(cctx.cacheId(), exchId.topologyVersion()); if ((oldest.id().equals(loc.id()) && oldest.id().equals(exchId.nodeId()) && exchId.isJoined()) || added) { assert exchId.isJoined() || added; @@ -604,7 +606,7 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology { try { return new GridDhtPartitionMap(cctx.nodeId(), updateSeq.get(), - F.viewReadOnly(locParts, CU.<K, V>part2state()), true); + F.viewReadOnly(locParts, CU.part2state()), true); } finally { lock.readLock().unlock(); @@ -660,13 +662,15 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology { * @return List of nodes for the partition. */ private List<ClusterNode> nodes(int p, AffinityTopologyVersion topVer, GridDhtPartitionState state, GridDhtPartitionState... states) { - Collection<UUID> allIds = topVer.topologyVersion() > 0 ? F.nodeIds(CU.allNodes(cctx, topVer)) : null; + Collection<UUID> allIds = topVer.topologyVersion() > 0 ? F.nodeIds(CU.affinityNodes(cctx, topVer)) : null; lock.readLock().lock(); try { assert node2part != null && node2part.valid() : "Invalid node-to-partitions map [topVer=" + topVer + - ", allIds=" + allIds + ", node2part=" + node2part + ']'; + ", allIds=" + allIds + + ", node2part=" + node2part + + ", cache=" + cctx.name() + ']'; Collection<UUID> nodeIds = part2node.get(p); @@ -738,7 +742,11 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology { try { assert node2part != null && node2part.valid() : "Invalid node2part [node2part: " + node2part + - ", locNodeId=" + cctx.localNode().id() + ", locName=" + cctx.gridName() + ']'; + ", cache=" + cctx.name() + + ", started=" + cctx.started() + + ", stopping=" + stopping + + ", locNodeId=" + cctx.localNode().id() + + ", locName=" + cctx.gridName() + ']'; GridDhtPartitionFullMap m = node2part; @@ -756,6 +764,8 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology { if (log.isDebugEnabled()) log.debug("Updating full partition map [exchId=" + exchId + ", parts=" + fullMapString() + ']'); + assert partMap != null; + lock.writeLock().lock(); try { @@ -1024,7 +1034,9 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology { assert nodeId.equals(cctx.nodeId()); // In case if node joins, get topology at the time of joining node. - ClusterNode oldest = CU.oldest(cctx, topVer); + ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), topVer); + + assert oldest != null; // If this node became the oldest node. if (oldest.id().equals(cctx.nodeId())) { @@ -1074,7 +1086,9 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology { assert nodeId != null; assert lock.writeLock().isHeldByCurrentThread(); - ClusterNode oldest = CU.oldest(cctx, topVer); + ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), topVer); + + assert oldest != null; ClusterNode loc = cctx.localNode(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java index 26eef50..703daf9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java @@ -81,7 +81,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach @Override public void start() throws IgniteCheckedException { super.start(); - preldr = new GridDhtPreloader<>(ctx); + preldr = new GridDhtPreloader(ctx); preldr.start(); @@ -518,7 +518,6 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach return; } - // Group lock can be only started from local node, so we never start group lock transaction on remote node. IgniteInternalFuture<?> f = lockAllAsync(ctx, nearNode, req, null); // Register listener just so we print out errors. @@ -534,8 +533,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach private void processDhtLockResponse(UUID nodeId, GridDhtLockResponse res) { assert nodeId != null; assert res != null; - GridDhtLockFuture<K, V> fut = (GridDhtLockFuture<K, V>)ctx.mvcc().<Boolean>future(res.version(), - res.futureId()); + GridDhtLockFuture fut = (GridDhtLockFuture)ctx.mvcc().<Boolean>future(res.version(), res.futureId()); if (fut == null) { if (log.isDebugEnabled()) @@ -604,7 +602,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach assert tx != null; - GridDhtLockFuture<K, V> fut = new GridDhtLockFuture<>( + GridDhtLockFuture fut = new GridDhtLockFuture( ctx, tx.nearNodeId(), tx.nearXidVersion(), @@ -669,7 +667,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach * @return Future. */ public IgniteInternalFuture<GridNearLockResponse> lockAllAsync( - final GridCacheContext<K, V> cacheCtx, + final GridCacheContext<?, ?> cacheCtx, final ClusterNode nearNode, final GridNearLockRequest req, @Nullable final CacheEntryPredicate[] filter0) { @@ -719,26 +717,57 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach if (filter == null) filter = req.filter(); - GridDhtLockFuture<K, V> fut = null; + GridDhtLockFuture fut = null; if (!req.inTx()) { - fut = new GridDhtLockFuture<>(ctx, - nearNode.id(), - req.version(), - req.topologyVersion(), - cnt, - req.txRead(), - req.needReturnValue(), - req.timeout(), - tx, - req.threadId(), - req.accessTtl(), - filter, - req.skipStore()); + GridDhtPartitionTopology top = null; + + if (req.firstClientRequest()) { + assert CU.clientNode(nearNode); + + top = topology(); + + topology().readLock(); + } + + try { + if (top != null && needRemap(req.topologyVersion(), top.topologyVersion())) { + if (log.isDebugEnabled()) { + log.debug("Client topology version mismatch, need remap lock request [" + + "reqTopVer=" + req.topologyVersion() + + ", locTopVer=" + top.topologyVersion() + + ", req=" + req + ']'); + } + + GridNearLockResponse res = sendClientLockRemapResponse(nearNode, + req, + top.topologyVersion()); + + return new GridFinishedFuture<>(res); + } + + fut = new GridDhtLockFuture(ctx, + nearNode.id(), + req.version(), + req.topologyVersion(), + cnt, + req.txRead(), + req.needReturnValue(), + req.timeout(), + tx, + req.threadId(), + req.accessTtl(), + filter, + req.skipStore()); - // Add before mapping. - if (!ctx.mvcc().addFuture(fut)) - throw new IllegalStateException("Duplicate future ID: " + fut); + // Add before mapping. + if (!ctx.mvcc().addFuture(fut)) + throw new IllegalStateException("Duplicate future ID: " + fut); + } + finally { + if (top != null) + top.readUnlock(); + } } boolean timedout = false; @@ -788,45 +817,76 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach // Handle implicit locks for pessimistic transactions. if (req.inTx()) { if (tx == null) { - tx = new GridDhtTxLocal( - ctx.shared(), - nearNode.id(), - req.version(), - req.futureId(), - req.miniId(), - req.threadId(), - req.implicitTx(), - req.implicitSingleTx(), - ctx.systemTx(), - false, - ctx.ioPolicy(), - PESSIMISTIC, - req.isolation(), - req.timeout(), - req.isInvalidate(), - false, - req.txSize(), - null, - req.subjectId(), - req.taskNameHash()); + GridDhtPartitionTopology top = null; - tx.syncCommit(req.syncCommit()); + if (req.firstClientRequest()) { + assert CU.clientNode(nearNode); - tx = ctx.tm().onCreated(null, tx); + top = topology(); - if (tx == null || !tx.init()) { - String msg = "Failed to acquire lock (transaction has been completed): " + - req.version(); + topology().readLock(); + } - U.warn(log, msg); + try { + if (top != null && needRemap(req.topologyVersion(), top.topologyVersion())) { + if (log.isDebugEnabled()) { + log.debug("Client topology version mismatch, need remap lock request [" + + "reqTopVer=" + req.topologyVersion() + + ", locTopVer=" + top.topologyVersion() + + ", req=" + req + ']'); + } - if (tx != null) - tx.rollback(); + GridNearLockResponse res = sendClientLockRemapResponse(nearNode, + req, + top.topologyVersion()); - return new GridDhtFinishedFuture<>(new IgniteCheckedException(msg)); - } + return new GridFinishedFuture<>(res); + } - tx.topologyVersion(req.topologyVersion()); + tx = new GridDhtTxLocal( + ctx.shared(), + nearNode.id(), + req.version(), + req.futureId(), + req.miniId(), + req.threadId(), + req.implicitTx(), + req.implicitSingleTx(), + ctx.systemTx(), + false, + ctx.ioPolicy(), + PESSIMISTIC, + req.isolation(), + req.timeout(), + req.isInvalidate(), + false, + req.txSize(), + null, + req.subjectId(), + req.taskNameHash()); + + tx.syncCommit(req.syncCommit()); + + tx = ctx.tm().onCreated(null, tx); + + if (tx == null || !tx.init()) { + String msg = "Failed to acquire lock (transaction has been completed): " + + req.version(); + + U.warn(log, msg); + + if (tx != null) + tx.rollback(); + + return new GridDhtFinishedFuture<>(new IgniteCheckedException(msg)); + } + + tx.topologyVersion(req.topologyVersion()); + } + finally { + if (top != null) + top.readUnlock(); + } } ctx.tm().txContext(tx); @@ -947,6 +1007,42 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach } /** + * @param nearNode Client node. + * @param req Request. + * @param topVer Remap version. + * @return Response. + */ + private GridNearLockResponse sendClientLockRemapResponse(ClusterNode nearNode, + GridNearLockRequest req, + AffinityTopologyVersion topVer) { + assert topVer != null; + + GridNearLockResponse res = new GridNearLockResponse( + ctx.cacheId(), + req.version(), + req.futureId(), + req.miniId(), + false, + 0, + null, + topVer); + + try { + ctx.io().send(nearNode, res, ctx.ioPolicy()); + } + catch (ClusterTopologyCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to send client lock remap response, client node failed " + + "[node=" + nearNode + ", req=" + req + ']'); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send client lock remap response [node=" + nearNode + ", req=" + req + ']', e); + } + + return res; + } + + /** * @param nearNode Near node. * @param entries Entries. * @param req Lock request. @@ -968,7 +1064,13 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach try { // Send reply back to originating near node. GridNearLockResponse res = new GridNearLockResponse(ctx.cacheId(), - req.version(), req.futureId(), req.miniId(), tx != null && tx.onePhaseCommit(), entries.size(), err); + req.version(), + req.futureId(), + req.miniId(), + tx != null && tx.onePhaseCommit(), + entries.size(), + err, + null); if (err == null) { res.pending(localDhtPendingVersions(entries, mappedVer)); @@ -1077,8 +1179,14 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach U.error(log, "Failed to get value for lock reply message for node [node=" + U.toShortString(nearNode) + ", req=" + req + ']', e); - return new GridNearLockResponse(ctx.cacheId(), req.version(), req.futureId(), req.miniId(), false, - entries.size(), e); + return new GridNearLockResponse(ctx.cacheId(), + req.version(), + req.futureId(), + req.miniId(), + false, + entries.size(), + e, + null); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java index 54b59b8..90edb0f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java @@ -52,15 +52,13 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { private static final long serialVersionUID = 0L; /** Near mappings. */ - protected Map<UUID, GridDistributedTxMapping> nearMap = - new ConcurrentHashMap8<>(); + protected Map<UUID, GridDistributedTxMapping> nearMap = new ConcurrentHashMap8<>(); /** DHT mappings. */ - protected Map<UUID, GridDistributedTxMapping> dhtMap = - new ConcurrentHashMap8<>(); + protected Map<UUID, GridDistributedTxMapping> dhtMap = new ConcurrentHashMap8<>(); /** Mapped flag. */ - private AtomicBoolean mapped = new AtomicBoolean(); + protected AtomicBoolean mapped = new AtomicBoolean(); /** */ private long dhtThreadId; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index 293cf95..af0fbdf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -582,7 +582,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter tx.writeVersion(), tx.invalidPartitions(), ret, - prepErr); + prepErr, + null); if (prepErr == null) { addDhtValues(res); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 8bbfe96..8630421 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -171,7 +171,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { metrics = m; - preldr = new GridDhtPreloader<>(ctx); + preldr = new GridDhtPreloader(ctx); preldr.start(); @@ -737,6 +737,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { @Nullable final CacheEntryPredicate[] filter, final boolean waitTopFut ) { + assert ctx.updatesAllowed(); + if (map != null && keyCheck) validateCacheKeys(map.keySet()); @@ -793,6 +795,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { boolean rawRetval, @Nullable final CacheEntryPredicate[] filter ) { + assert ctx.updatesAllowed(); + final boolean statsEnabled = ctx.config().isStatisticsEnabled(); final long start = statsEnabled ? System.nanoTime() : 0L; @@ -1024,9 +1028,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { IgniteCacheExpiryPolicy expiry = null; try { - // If batch store update is enabled, we need to lock all entries. - // First, need to acquire locks on cache entries, then check filter. - List<GridDhtCacheEntry> locked = lockEntries(keys, req.topologyVersion()); + List<GridDhtCacheEntry> locked = null; Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted = null; try { @@ -1043,11 +1045,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } // Do not check topology version for CLOCK versioning since - // partition exchange will wait for near update future. + // partition exchange will wait for near update future (if future is on server node). // Also do not check topology version if topology was locked on near node by // external transaction or explicit lock. - if (topology().topologyVersion().equals(req.topologyVersion()) || req.topologyLocked() || - ctx.config().getAtomicWriteOrderMode() == CLOCK) { + if ((req.fastMap() && !req.clientRequest()) || req.topologyLocked() || + !needRemap(req.topologyVersion(), topology().topologyVersion())) { ClusterNode node = ctx.discovery().node(nodeId); if (node == null) { @@ -1056,13 +1058,17 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { return; } + // If batch store update is enabled, we need to lock all entries. + // First, need to acquire locks on cache entries, then check filter. + locked = lockEntries(keys, req.topologyVersion()); + boolean hasNear = ctx.discovery().cacheNearNode(node, name()); GridCacheVersion ver = req.updateVersion(); if (ver == null) { // Assign next version for update inside entries lock. - ver = ctx.versions().next(req.topologyVersion()); + ver = ctx.versions().next(topology().topologyVersion()); if (hasNear) res.nearVersion(ver); @@ -1105,7 +1111,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { retVal = updRes.invokeResults(); } else { - UpdateSingleResult<K, V> updRes = updateSingle(node, + UpdateSingleResult updRes = updateSingle(node, hasNear, req, res, @@ -1144,7 +1150,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { e.printStackTrace(); } finally { - unlockEntries(locked, req.topologyVersion()); + if (locked != null) + unlockEntries(locked, req.topologyVersion()); // Enqueue if necessary after locks release. if (deleted != null) { @@ -1157,7 +1164,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } } catch (GridDhtInvalidPartitionException ignore) { - assert ctx.config().getAtomicWriteOrderMode() == PRIMARY; + assert !req.fastMap() || req.clientRequest() : req; if (log.isDebugEnabled()) log.debug("Caught invalid partition exception for cache entry (will remap update request): " + req); @@ -1605,7 +1612,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @return Return value. * @throws GridCacheEntryRemovedException Should be never thrown. */ - private UpdateSingleResult<K, V> updateSingle( + private UpdateSingleResult updateSingle( ClusterNode node, boolean hasNear, GridNearAtomicUpdateRequest req, @@ -1799,7 +1806,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } } - return new UpdateSingleResult<>(retVal, deleted, dhtFut); + return new UpdateSingleResult(retVal, deleted, dhtFut); } /** @@ -2572,7 +2579,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /** * Result of {@link GridDhtAtomicCache#updateSingle} execution. */ - private static class UpdateSingleResult<K, V> { + private static class UpdateSingleResult { /** */ private final GridCacheReturn retVal; @@ -2772,14 +2779,18 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /** {@inheritDoc} */ @Override public void onTimeout() { if (guard.compareAndSet(false, true)) { - writeLock().lock(); + ctx.closures().runLocalSafe(new Runnable() { + @Override public void run() { + writeLock().lock(); - try { - finish(); - } - finally { - writeLock().unlock(); - } + try { + finish(); + } + finally { + writeLock().unlock(); + } + } + }); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java index 40ab104..ff8454e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java @@ -86,6 +86,9 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> /** Future keys. */ private Collection<KeyCacheObject> keys; + /** */ + private boolean waitForExchange; + /** * @param cctx Cache context. * @param completionCb Callback to invoke when future is completed. @@ -113,6 +116,10 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> log = U.logger(cctx.kernalContext(), logRef, GridDhtAtomicUpdateFuture.class); keys = new ArrayList<>(updateReq.keys().size()); + + boolean topLocked = updateReq.topologyLocked() || (updateReq.fastMap() && !updateReq.clientRequest()); + + waitForExchange = !topLocked; } /** {@inheritDoc} */ @@ -164,8 +171,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> /** {@inheritDoc} */ @Override public boolean waitForPartitionExchange() { - // Wait dht update futures in PRIMARY mode. - return cctx.config().getAtomicWriteOrderMode() == CacheAtomicWriteOrderMode.PRIMARY; + return waitForExchange; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index 76e05e5..07f5ecf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -128,6 +128,12 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> /** Fast map flag. */ private final boolean fastMap; + /** */ + private boolean fastMapRemap; + + /** */ + private GridCacheVersion updVer; + /** Near cache flag. */ private final boolean nearEnabled; @@ -304,11 +310,11 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> topVer = cctx.mvcc().lastExplicitLockTopologyVersion(Thread.currentThread().getId()); if (topVer == null) - mapOnTopology(keys, false, null, waitTopFut); + mapOnTopology(null, false, null, waitTopFut); else { topLocked = true; - map0(topVer, keys, false, null); + map0(topVer, null, false, null); } } @@ -343,9 +349,11 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> */ public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res) { if (res.remapKeys() != null) { - assert cctx.config().getAtomicWriteOrderMode() == PRIMARY; + assert !fastMap || cctx.kernalContext().clientNode(); + + Collection<KeyCacheObject> remapKeys = fastMap ? null : res.remapKeys(); - mapOnTopology(res.remapKeys(), true, nodeId, true); + mapOnTopology(remapKeys, true, nodeId, true); return; } @@ -454,9 +462,12 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> else { if (waitTopFut) { fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { - @Override - public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) { - mapOnTopology(keys, remap, oldNodeId, waitTopFut); + @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) { + cctx.kernalContext().closure().runLocalSafe(new Runnable() { + @Override public void run() { + mapOnTopology(keys, remap, oldNodeId, waitTopFut); + } + }); } }); } @@ -476,29 +487,43 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> /** * Checks if future is ready to be completed. */ - private synchronized void checkComplete() { - if ((syncMode == FULL_ASYNC && cctx.config().getAtomicWriteOrderMode() == PRIMARY) || mappings.isEmpty()) { - CachePartialUpdateCheckedException err0 = err; + private void checkComplete() { + boolean remap = false; - if (err0 != null) - onDone(err0); - else - onDone(opRes); + synchronized (this) { + if ((syncMode == FULL_ASYNC && cctx.config().getAtomicWriteOrderMode() == PRIMARY) || mappings.isEmpty()) { + CachePartialUpdateCheckedException err0 = err; + + if (err0 != null) + onDone(err0); + else { + if (fastMapRemap) { + assert cctx.kernalContext().clientNode(); + + remap = true; + } + else + onDone(opRes); + } + } } + + if (remap) + mapOnTopology(null, true, null, true); } /** * @param topVer Topology version. - * @param keys Keys to map. + * @param remapKeys Keys to remap or {@code null} to map all keys. * @param remap Flag indicating if this is partial remap for this future. * @param oldNodeId Old node ID if was remap. */ private void map0( AffinityTopologyVersion topVer, - Collection<?> keys, + @Nullable Collection<?> remapKeys, boolean remap, @Nullable UUID oldNodeId) { - assert oldNodeId == null || remap; + assert oldNodeId == null || remap || fastMapRemap; Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer); @@ -519,12 +544,15 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> CacheConfiguration ccfg = cctx.config(); // Assign version on near node in CLOCK ordering mode even if fastMap is false. - GridCacheVersion updVer = ccfg.getAtomicWriteOrderMode() == CLOCK ? cctx.versions().next(topVer) : null; + if (updVer == null) + updVer = ccfg.getAtomicWriteOrderMode() == CLOCK ? cctx.versions().next(topVer) : null; if (updVer != null && log.isDebugEnabled()) log.debug("Assigned fast-map version for update on near node: " + updVer); if (keys.size() == 1 && !fastMap && (single == null || single)) { + assert remapKeys == null || remapKeys.size() == 1 : remapKeys; + Object key = F.first(keys); Object val; @@ -610,7 +638,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> filter, subjId, taskNameHash, - skipStore); + skipStore, + cctx.kernalContext().clientNode()); req.addUpdateEntry(cacheKey, val, @@ -647,9 +676,16 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> // Must do this in synchronized block because we need to atomically remove and add mapping. // Otherwise checkComplete() may see empty intermediate state. synchronized (this) { - if (remap) + if (oldNodeId != null) removeMapping(oldNodeId); + // For fastMap mode wait for all responses before remapping. + if (remap && fastMap && !mappings.isEmpty()) { + fastMapRemap = true; + + return; + } + // Create mappings first, then send messages. for (Object key : keys) { if (key == null) { @@ -705,6 +741,9 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> KeyCacheObject cacheKey = cctx.toCacheKeyObject(key); + if (remapKeys != null && !remapKeys.contains(cacheKey)) + continue; + if (op != TRANSFORM) val = cctx.toCacheObject(val); @@ -748,7 +787,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> filter, subjId, taskNameHash, - skipStore); + skipStore, + cctx.kernalContext().clientNode()); pendingMappings.put(nodeId, mapped); @@ -763,6 +803,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> i++; } } + + fastMapRemap = false; } if ((single == null || single) && pendingMappings.size() == 1) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java index a96a666..86c5ab8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java @@ -132,6 +132,9 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri /** Skip write-through to a persistent storage. */ private boolean skipStore; + /** */ + private boolean clientReq; + /** * Empty constructor required by {@link Externalizable}. */ @@ -148,6 +151,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri * @param fastMap Fast map scheme flag. * @param updateVer Update version set if fast map is performed. * @param topVer Topology version. + * @param topLocked Topology locked flag. * @param syncMode Synchronization mode. * @param op Cache update operation. * @param retval Return value required flag. @@ -157,6 +161,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri * @param subjId Subject ID. * @param taskNameHash Task name hash code. * @param skipStore Skip write-through to a persistent storage. + * @param clientReq Client node request flag. */ public GridNearAtomicUpdateRequest( int cacheId, @@ -174,7 +179,8 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri @Nullable CacheEntryPredicate[] filter, @Nullable UUID subjId, int taskNameHash, - boolean skipStore + boolean skipStore, + boolean clientReq ) { this.cacheId = cacheId; this.nodeId = nodeId; @@ -193,6 +199,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri this.subjId = subjId; this.taskNameHash = taskNameHash; this.skipStore = skipStore; + this.clientReq = clientReq; keys = new ArrayList<>(); } @@ -266,6 +273,13 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri } /** + * @return {@code True} if request sent from client node. + */ + public boolean clientRequest() { + return clientReq; + } + + /** * @return Cache write synchronization mode. */ public CacheWriteSynchronizationMode writeSynchronizationMode() { @@ -574,126 +588,132 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri switch (writer.state()) { case 3: - if (!writer.writeMessage("conflictExpireTimes", conflictExpireTimes)) + if (!writer.writeBoolean("clientReq", clientReq)) return false; writer.incrementState(); case 4: - if (!writer.writeMessage("conflictTtls", conflictTtls)) + if (!writer.writeMessage("conflictExpireTimes", conflictExpireTimes)) return false; writer.incrementState(); case 5: - if (!writer.writeCollection("conflictVers", conflictVers, MessageCollectionItemType.MSG)) + if (!writer.writeMessage("conflictTtls", conflictTtls)) return false; writer.incrementState(); case 6: - if (!writer.writeCollection("entryProcessorsBytes", entryProcessorsBytes, MessageCollectionItemType.BYTE_ARR)) + if (!writer.writeCollection("conflictVers", conflictVers, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 7: - if (!writer.writeByteArray("expiryPlcBytes", expiryPlcBytes)) + if (!writer.writeCollection("entryProcessorsBytes", entryProcessorsBytes, MessageCollectionItemType.BYTE_ARR)) return false; writer.incrementState(); case 8: - if (!writer.writeBoolean("fastMap", fastMap)) + if (!writer.writeByteArray("expiryPlcBytes", expiryPlcBytes)) return false; writer.incrementState(); case 9: - if (!writer.writeObjectArray("filter", filter, MessageCollectionItemType.MSG)) + if (!writer.writeBoolean("fastMap", fastMap)) return false; writer.incrementState(); case 10: - if (!writer.writeMessage("futVer", futVer)) + if (!writer.writeObjectArray("filter", filter, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 11: - if (!writer.writeBoolean("hasPrimary", hasPrimary)) + if (!writer.writeMessage("futVer", futVer)) return false; writer.incrementState(); case 12: - if (!writer.writeObjectArray("invokeArgsBytes", invokeArgsBytes, MessageCollectionItemType.BYTE_ARR)) + if (!writer.writeBoolean("hasPrimary", hasPrimary)) return false; writer.incrementState(); case 13: - if (!writer.writeCollection("keys", keys, MessageCollectionItemType.MSG)) + if (!writer.writeObjectArray("invokeArgsBytes", invokeArgsBytes, MessageCollectionItemType.BYTE_ARR)) return false; writer.incrementState(); case 14: - if (!writer.writeByte("op", op != null ? (byte)op.ordinal() : -1)) + if (!writer.writeCollection("keys", keys, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 15: - if (!writer.writeBoolean("retval", retval)) + if (!writer.writeByte("op", op != null ? (byte)op.ordinal() : -1)) return false; writer.incrementState(); case 16: - if (!writer.writeBoolean("skipStore", skipStore)) + if (!writer.writeBoolean("retval", retval)) return false; writer.incrementState(); case 17: - if (!writer.writeUuid("subjId", subjId)) + if (!writer.writeBoolean("skipStore", skipStore)) return false; writer.incrementState(); case 18: - if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1)) + if (!writer.writeUuid("subjId", subjId)) return false; writer.incrementState(); case 19: - if (!writer.writeInt("taskNameHash", taskNameHash)) + if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1)) return false; writer.incrementState(); case 20: - if (!writer.writeBoolean("topLocked", topLocked)) + if (!writer.writeInt("taskNameHash", taskNameHash)) return false; writer.incrementState(); case 21: - if (!writer.writeMessage("topVer", topVer)) + if (!writer.writeBoolean("topLocked", topLocked)) return false; writer.incrementState(); case 22: - if (!writer.writeMessage("updateVer", updateVer)) + if (!writer.writeMessage("topVer", topVer)) return false; writer.incrementState(); case 23: + if (!writer.writeMessage("updateVer", updateVer)) + return false; + + writer.incrementState(); + + case 24: if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG)) return false; @@ -716,7 +736,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri switch (reader.state()) { case 3: - conflictExpireTimes = reader.readMessage("conflictExpireTimes"); + clientReq = reader.readBoolean("clientReq"); if (!reader.isLastRead()) return false; @@ -724,7 +744,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); case 4: - conflictTtls = reader.readMessage("conflictTtls"); + conflictExpireTimes = reader.readMessage("conflictExpireTimes"); if (!reader.isLastRead()) return false; @@ -732,7 +752,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); case 5: - conflictVers = reader.readCollection("conflictVers", MessageCollectionItemType.MSG); + conflictTtls = reader.readMessage("conflictTtls"); if (!reader.isLastRead()) return false; @@ -740,7 +760,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); case 6: - entryProcessorsBytes = reader.readCollection("entryProcessorsBytes", MessageCollectionItemType.BYTE_ARR); + conflictVers = reader.readCollection("conflictVers", MessageCollectionItemType.MSG); if (!reader.isLastRead()) return false; @@ -748,7 +768,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); case 7: - expiryPlcBytes = reader.readByteArray("expiryPlcBytes"); + entryProcessorsBytes = reader.readCollection("entryProcessorsBytes", MessageCollectionItemType.BYTE_ARR); if (!reader.isLastRead()) return false; @@ -756,7 +776,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); case 8: - fastMap = reader.readBoolean("fastMap"); + expiryPlcBytes = reader.readByteArray("expiryPlcBytes"); if (!reader.isLastRead()) return false; @@ -764,7 +784,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); case 9: - filter = reader.readObjectArray("filter", MessageCollectionItemType.MSG, CacheEntryPredicate.class); + fastMap = reader.readBoolean("fastMap"); if (!reader.isLastRead()) return false; @@ -772,7 +792,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); case 10: - futVer = reader.readMessage("futVer"); + filter = reader.readObjectArray("filter", MessageCollectionItemType.MSG, CacheEntryPredicate.class); if (!reader.isLastRead()) return false; @@ -780,7 +800,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); case 11: - hasPrimary = reader.readBoolean("hasPrimary"); + futVer = reader.readMessage("futVer"); if (!reader.isLastRead()) return false; @@ -788,7 +808,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); case 12: - invokeArgsBytes = reader.readObjectArray("invokeArgsBytes", MessageCollectionItemType.BYTE_ARR, byte[].class); + hasPrimary = reader.readBoolean("hasPrimary"); if (!reader.isLastRead()) return false; @@ -796,7 +816,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); case 13: - keys = reader.readCollection("keys", MessageCollectionItemType.MSG); + invokeArgsBytes = reader.readObjectArray("invokeArgsBytes", MessageCollectionItemType.BYTE_ARR, byte[].class); if (!reader.isLastRead()) return false; @@ -804,6 +824,14 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); case 14: + keys = reader.readCollection("keys", MessageCollectionItemType.MSG); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 15: byte opOrd; opOrd = reader.readByte("op"); @@ -815,7 +843,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); - case 15: + case 16: retval = reader.readBoolean("retval"); if (!reader.isLastRead()) @@ -823,7 +851,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); - case 16: + case 17: skipStore = reader.readBoolean("skipStore"); if (!reader.isLastRead()) @@ -831,7 +859,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); - case 17: + case 18: subjId = reader.readUuid("subjId"); if (!reader.isLastRead()) @@ -839,7 +867,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); - case 18: + case 19: byte syncModeOrd; syncModeOrd = reader.readByte("syncMode"); @@ -851,7 +879,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); - case 19: + case 20: taskNameHash = reader.readInt("taskNameHash"); if (!reader.isLastRead()) @@ -859,7 +887,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); - case 20: + case 21: topLocked = reader.readBoolean("topLocked"); if (!reader.isLastRead()) @@ -867,7 +895,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); - case 21: + case 22: topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) @@ -875,7 +903,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); - case 22: + case 23: updateVer = reader.readMessage("updateVer"); if (!reader.isLastRead()) @@ -883,7 +911,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); - case 23: + case 24: vals = reader.readCollection("vals", MessageCollectionItemType.MSG); if (!reader.isLastRead()) @@ -903,7 +931,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 24; + return 25; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index 05b3c7b..221b230 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -362,13 +362,13 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte @Nullable TransactionIsolation isolation, long accessTtl ) { - assert tx == null || tx instanceof GridNearTxLocal; + assert tx == null || tx instanceof GridNearTxLocal : tx; GridNearTxLocal txx = (GridNearTxLocal)tx; CacheOperationContext opCtx = ctx.operationContextPerCall(); - GridDhtColocatedLockFuture<K, V> fut = new GridDhtColocatedLockFuture<>(ctx, + GridDhtColocatedLockFuture fut = new GridDhtColocatedLockFuture(ctx, keys, txx, isRead, @@ -619,7 +619,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte * @return Lock future. */ IgniteInternalFuture<Exception> lockAllAsync( - final GridCacheContext<K, V> cacheCtx, + final GridCacheContext<?, ?> cacheCtx, @Nullable final GridNearTxLocal tx, final long threadId, final GridCacheVersion ver, @@ -700,7 +700,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte * @return Lock future. */ private IgniteInternalFuture<Exception> lockAllAsync0( - GridCacheContext<K, V> cacheCtx, + GridCacheContext<?, ?> cacheCtx, @Nullable final GridNearTxLocal tx, long threadId, final GridCacheVersion ver, @@ -715,7 +715,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte int cnt = keys.size(); if (tx == null) { - GridDhtLockFuture<K, V> fut = new GridDhtLockFuture<>(ctx, + GridDhtLockFuture fut = new GridDhtLockFuture(ctx, ctx.localNodeId(), ver, topVer, @@ -838,7 +838,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte assert nodeId != null; assert res != null; - GridDhtColocatedLockFuture<K, V> fut = (GridDhtColocatedLockFuture<K, V>)ctx.mvcc(). + GridDhtColocatedLockFuture fut = (GridDhtColocatedLockFuture)ctx.mvcc(). <Boolean>future(res.version(), res.futureId()); if (fut != null) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java index 372c517..c784948 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java @@ -46,7 +46,7 @@ import static org.apache.ignite.events.EventType.*; /** * Colocated cache lock future. */ -public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentityFuture<Boolean> +public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture<Boolean> implements GridCacheFuture<Boolean> { /** */ private static final long serialVersionUID = 0L; @@ -59,7 +59,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity /** Cache registry. */ @GridToStringExclude - private GridCacheContext<K, V> cctx; + private GridCacheContext<?, ?> cctx; /** Lock owner thread. */ @GridToStringInclude @@ -121,10 +121,10 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity * @param timeout Lock acquisition timeout. * @param accessTtl TTL for read operation. * @param filter Filter. - * @param skipStore + * @param skipStore Skip store flag. */ public GridDhtColocatedLockFuture( - GridCacheContext<K, V> cctx, + GridCacheContext<?, ?> cctx, Collection<KeyCacheObject> keys, @Nullable GridNearTxLocal tx, boolean read, @@ -326,13 +326,14 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity * Undoes all locks. * * @param dist If {@code true}, then remove locks from remote nodes as well. + * @param rollback {@code True} if should rollback tx. */ - private void undoLocks(boolean dist) { + private void undoLocks(boolean dist, boolean rollback) { // Transactions will undo during rollback. if (dist && tx == null) cctx.colocated().removeLocks(threadId, lockVer, keys); else { - if (tx != null) { + if (rollback && tx != null) { if (tx.setRollbackOnly()) { if (log.isDebugEnabled()) log.debug("Marked transaction as rollback only because locks could not be acquired: " + tx); @@ -346,16 +347,6 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity } /** - * - * @param dist {@code True} if need to distribute lock release. - */ - private void onFailed(boolean dist) { - undoLocks(dist); - - complete(false); - } - - /** * @param success Success flag. */ public void complete(boolean success) { @@ -475,7 +466,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity ", fut=" + this + ']'); if (!success) - undoLocks(distribute); + undoLocks(distribute, true); if (tx != null) cctx.tm().txContext(tx); @@ -550,7 +541,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity // Continue mapping on the same topology version as it was before. this.topVer.compareAndSet(null, topVer); - map(keys); + map(keys, false); markInitialized(); @@ -558,14 +549,17 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity } // Must get topology snapshot and map on that version. - mapOnTopology(); + mapOnTopology(false, null); } /** * Acquires topology future and checks it completeness under the read lock. If it is not complete, * will asynchronously wait for it's completeness and then try again. + * + * @param remap Remap flag. + * @param c Optional closure to run after map. */ - private void mapOnTopology() { + private void mapOnTopology(final boolean remap, @Nullable final Runnable c) { // We must acquire topology snapshot from the topology version future. cctx.topology().readLock(); @@ -589,19 +583,30 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity AffinityTopologyVersion topVer = fut.topologyVersion(); - if (tx != null) - tx.topologyVersion(topVer); + if (remap) { + if (tx != null) + tx.onRemap(topVer); + + this.topVer.set(topVer); + } + else { + if (tx != null) + tx.topologyVersion(topVer); + + this.topVer.compareAndSet(null, topVer); + } - this.topVer.compareAndSet(null, topVer); + map(keys, remap); - map(keys); + if (c != null) + c.run(); markInitialized(); } else { fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) { - mapOnTopology(); + mapOnTopology(remap, c); } }); } @@ -617,8 +622,9 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity * groups belonging to one primary node and locks for these groups are acquired sequentially. * * @param keys Keys. + * @param remap Remap flag. */ - private void map(Collection<KeyCacheObject> keys) { + private void map(Collection<KeyCacheObject> keys, boolean remap) { try { AffinityTopologyVersion topVer = this.topVer.get(); @@ -633,8 +639,12 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity return; } + boolean clientNode = cctx.kernalContext().clientNode(); + + assert !remap || (clientNode && (tx == null || !tx.hasRemoteLocks())); + // First assume this node is primary for all keys passed in. - if (mapAsPrimary(keys, topVer)) + if (!clientNode && mapAsPrimary(keys, topVer)) return; Deque<GridNearLockMapping> mappings = new ConcurrentLinkedDeque8<>(); @@ -668,6 +678,8 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity boolean hasRmtNodes = false; + boolean first = true; + // Create mini futures. for (Iterator<GridNearLockMapping> iter = mappings.iterator(); iter.hasNext(); ) { GridNearLockMapping mapping = iter.next(); @@ -736,6 +748,14 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity if (cand != null && !cand.reentry()) { if (req == null) { + boolean clientFirst = false; + + if (first) { + clientFirst = clientNode && (tx == null || !tx.hasRemoteLocks()); + + first = false; + } + req = new GridNearLockRequest( cctx.cacheId(), topVer, @@ -757,7 +777,8 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity inTx() ? tx.subjectId() : null, inTx() ? tx.taskNameHash() : 0, read ? accessTtl : -1L, - skipStore); + skipStore, + clientFirst); mapping.request(req); } @@ -815,7 +836,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity if (hasRmtNodes) { trackable = true; - if (!cctx.mvcc().addFuture(this)) + if (!remap && !cctx.mvcc().addFuture(this)) throw new IllegalStateException("Duplicate future ID: " + this); } else @@ -1249,75 +1270,111 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity return; } - int i = 0; + if (res.clientRemapVersion() != null) { + assert cctx.kernalContext().clientNode(); + + IgniteInternalFuture<?> affFut = + cctx.shared().exchange().affinityReadyFuture(res.clientRemapVersion()); + + if (affFut != null && !affFut.isDone()) { + affFut.listen(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> fut) { + remap(); + } + }); + } + else + remap(); + } + else { + int i = 0; - for (KeyCacheObject k : keys) { - IgniteBiTuple<GridCacheVersion, CacheObject> oldValTup = valMap.get(k); + for (KeyCacheObject k : keys) { + IgniteBiTuple<GridCacheVersion, CacheObject> oldValTup = valMap.get(k); - CacheObject newVal = res.value(i); + CacheObject newVal = res.value(i); - GridCacheVersion dhtVer = res.dhtVersion(i); + GridCacheVersion dhtVer = res.dhtVersion(i); - if (newVal == null) { - if (oldValTup != null) { - if (oldValTup.get1().equals(dhtVer)) - newVal = oldValTup.get2(); + if (newVal == null) { + if (oldValTup != null) { + if (oldValTup.get1().equals(dhtVer)) + newVal = oldValTup.get2(); + } } - } - if (inTx()) { - IgniteTxEntry txEntry = tx.entry(cctx.txKey(k)); + if (inTx()) { + IgniteTxEntry txEntry = tx.entry(cctx.txKey(k)); + + // In colocated cache we must receive responses only for detached entries. + assert txEntry.cached().detached() : txEntry; - // In colocated cache we must receive responses only for detached entries. - assert txEntry.cached().detached(); + txEntry.markLocked(); - txEntry.markLocked(); + GridDhtDetachedCacheEntry entry = (GridDhtDetachedCacheEntry)txEntry.cached(); - GridDhtDetachedCacheEntry entry = (GridDhtDetachedCacheEntry)txEntry.cached(); + if (res.dhtVersion(i) == null) { + onDone(new IgniteCheckedException("Failed to receive DHT version from remote node " + + "(will fail the lock): " + res)); + + return; + } - if (res.dhtVersion(i) == null) { - onDone(new IgniteCheckedException("Failed to receive DHT version from remote node " + - "(will fail the lock): " + res)); + // Set value to detached entry. + entry.resetFromPrimary(newVal, dhtVer); - return; + tx.hasRemoteLocks(true); + + if (log.isDebugEnabled()) + log.debug("Processed response for entry [res=" + res + ", entry=" + entry + ']'); + } + else + cctx.mvcc().markExplicitOwner(k, threadId); + + if (retval && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) { + cctx.events().addEvent(cctx.affinity().partition(k), + k, + tx, + null, + EVT_CACHE_OBJECT_READ, + newVal, + newVal != null, + null, + false, + CU.subjectId(tx, cctx.shared()), + null, + tx == null ? null : tx.resolveTaskName()); } - // Set value to detached entry. - entry.resetFromPrimary(newVal, dhtVer); + i++; + } - if (log.isDebugEnabled()) - log.debug("Processed response for entry [res=" + res + ", entry=" + entry + ']'); + try { + proceedMapping(mappings); } - else - cctx.mvcc().markExplicitOwner(k, threadId); - - if (retval && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) { - cctx.events().addEvent(cctx.affinity().partition(k), - k, - tx, - null, - EVT_CACHE_OBJECT_READ, - newVal, - newVal != null, - null, - false, - CU.subjectId(tx, cctx.shared()), - null, - tx == null ? null : tx.resolveTaskName()); + catch (IgniteCheckedException e) { + onDone(e); } - i++; + onDone(true); } + } + } - try { - proceedMapping(mappings); - } - catch (IgniteCheckedException e) { - onDone(e); - } + /** + * + */ + private void remap() { + undoLocks(false, false); - onDone(true); - } + for (KeyCacheObject key : GridDhtColocatedLockFuture.this.keys) + cctx.mvcc().removeExplicitLock(threadId, key, lockVer); + + mapOnTopology(true, new Runnable() { + @Override public void run() { + onDone(true); + } + }); } /** {@inheritDoc} */