Repository: incubator-ignite Updated Branches: refs/heads/ignite-861 1141d456a -> d6a35cd29 (forced update)
IGNITE-80 - Porting changes to a separate branch. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/dcda61b4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/dcda61b4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/dcda61b4 Branch: refs/heads/ignite-861 Commit: dcda61b4fe2be3005544a3fc915b19ac3e4c9598 Parents: 1e53395 Author: Alexey Goncharuk <agoncha...@gridgain.com> Authored: Wed Apr 29 14:08:05 2015 -0700 Committer: Alexey Goncharuk <agoncha...@gridgain.com> Committed: Wed Apr 29 14:08:05 2015 -0700 ---------------------------------------------------------------------- .../processors/cache/GridCacheIoManager.java | 5 +-- .../GridCachePartitionExchangeManager.java | 4 +- .../distributed/dht/GridDhtCacheAdapter.java | 6 ++- .../dht/atomic/GridDhtAtomicCache.java | 4 +- .../dht/atomic/GridNearAtomicUpdateFuture.java | 42 +++++++++++++++----- .../dht/atomic/GridNearAtomicUpdateRequest.java | 36 ++++++++++++++--- .../colocated/GridDhtColocatedLockFuture.java | 4 +- .../cache/transactions/IgniteTxManager.java | 24 +++++++++++ 8 files changed, 101 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dcda61b4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index b8668e6..112330a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -146,9 +146,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { c = clsHandlers.get(new ListenerKey(cacheMsg.cacheId(), cacheMsg.getClass())); if (c == null) { - if (log.isDebugEnabled()) - log.debug("Received message without registered handler (will ignore) [msg=" + cacheMsg + - ", nodeId=" + nodeId + ']'); + U.warn(log, "Received message without registered handler (will ignore) [msg=" + cacheMsg + + ", nodeId=" + nodeId + ']'); return; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dcda61b4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 5f82ae2..e61168e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -409,10 +409,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana * @param ver Topology version. * @return Future or {@code null} is future is already completed. */ - public @Nullable IgniteInternalFuture<?> affinityReadyFuture(AffinityTopologyVersion ver) { + @Nullable public IgniteInternalFuture<?> affinityReadyFuture(AffinityTopologyVersion ver) { GridDhtPartitionsExchangeFuture lastInitializedFut0 = lastInitializedFut; - if (lastInitializedFut0 != null && lastInitializedFut0.topologyVersion().compareTo(ver) >= 0) { + if (lastInitializedFut0 != null && lastInitializedFut0.topologyVersion().compareTo(ver) == 0) { if (log.isDebugEnabled()) log.debug("Return lastInitializedFut for topology ready future " + "[ver=" + ver + ", fut=" + lastInitializedFut0 + ']'); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dcda61b4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java index 1c46fd0..4d1db85 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java @@ -645,8 +645,10 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap res.error(e); } - res.invalidPartitions(fut.invalidPartitions(), - new AffinityTopologyVersion(ctx.discovery().topologyVersion())); + if (!F.isEmpty(fut.invalidPartitions())) + res.invalidPartitions(fut.invalidPartitions(), ctx.shared().exchange().readyAffinityVersion()); + else + res.invalidPartitions(fut.invalidPartitions(), req.topologyVersion()); try { ctx.io().send(nodeId, res, ctx.ioPolicy()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dcda61b4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 905f7bf..a30f211 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -1041,7 +1041,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { // Do not check topology version for CLOCK versioning since // partition exchange will wait for near update future. - if (topology().topologyVersion().equals(req.topologyVersion()) || + // Also do not check topology version if topology was locked on near node by + // external transaction or explicit lock. + if (topology().topologyVersion().equals(req.topologyVersion()) || req.topologyLocked() || ctx.config().getAtomicWriteOrderMode() == CLOCK) { ClusterNode node = ctx.discovery().node(nodeId); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dcda61b4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index 072ab52..3dc89f6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -28,6 +28,7 @@ import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.dht.*; import org.apache.ignite.internal.processors.cache.distributed.near.*; import org.apache.ignite.internal.processors.cache.dr.*; +import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.tostring.*; @@ -136,6 +137,9 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> /** Task name hash. */ private final int taskNameHash; + /** Topology locked flag. Set if atomic update is performed inside a TX or explicit lock. */ + private boolean topLocked; + /** Skip store flag. */ private final boolean skipStore; @@ -289,7 +293,23 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> * @param waitTopFut Whether to wait for topology future. */ public void map(boolean waitTopFut) { - mapOnTopology(keys, false, null, waitTopFut); + AffinityTopologyVersion topVer = null; + + IgniteInternalTx tx = cctx.tm().anyActiveThreadTx(); + + if (tx != null && tx.topologyVersionSnapshot() != null) + topVer = tx.topologyVersionSnapshot(); + + if (topVer == null) + topVer = cctx.mvcc().lastExplicitLockTopologyVersion(Thread.currentThread().getId()); + + if (topVer == null) + mapOnTopology(keys, false, null, waitTopFut); + else { + topLocked = true; + + map0(topVer, keys, false, null); + } } /** {@inheritDoc} */ @@ -430,15 +450,12 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> } topVer = fut.topologyVersion(); - - if (futVer == null) - // Assign future version in topology read lock before first exception may be thrown. - futVer = cctx.versions().next(topVer); } else { if (waitTopFut) { fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { - @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) { + @Override + public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) { mapOnTopology(keys, remap, oldNodeId, waitTopFut); } }); @@ -448,9 +465,6 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> return; } - - if (!remap && (cctx.config().getAtomicWriteOrderMode() == CLOCK || syncMode != FULL_ASYNC)) - cctx.mvcc().addAtomicFuture(version(), this); } finally { cache.topology().readUnlock(); @@ -474,6 +488,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> } /** + * @param topVer Topology version. * @param keys Keys to map. * @param remap Flag indicating if this is partial remap for this future. * @param oldNodeId Old node ID if was remap. @@ -494,6 +509,13 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> return; } + if (futVer == null) + // Assign future version in topology read lock before first exception may be thrown. + futVer = cctx.versions().next(topVer); + + if (!remap && (cctx.config().getAtomicWriteOrderMode() == CLOCK || syncMode != FULL_ASYNC)) + cctx.mvcc().addAtomicFuture(version(), this); + CacheConfiguration ccfg = cctx.config(); // Assign version on near node in CLOCK ordering mode even if fastMap is false. @@ -579,6 +601,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> fastMap, updVer, topVer, + topLocked, syncMode, op, retval, @@ -716,6 +739,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> fastMap, updVer, topVer, + topLocked, syncMode, op, retval, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dcda61b4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java index e0e3e26..a96a666 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java @@ -64,6 +64,9 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri /** Topology version. */ private AffinityTopologyVersion topVer; + /** Topology locked flag. Set if atomic update is performed inside TX or explicit lock. */ + private boolean topLocked; + /** Write synchronization mode. */ private CacheWriteSynchronizationMode syncMode; @@ -162,6 +165,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri boolean fastMap, @Nullable GridCacheVersion updateVer, @NotNull AffinityTopologyVersion topVer, + boolean topLocked, CacheWriteSynchronizationMode syncMode, GridCacheOperation op, boolean retval, @@ -179,6 +183,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri this.updateVer = updateVer; this.topVer = topVer; + this.topLocked = topLocked; this.syncMode = syncMode; this.op = op; this.retval = retval; @@ -254,6 +259,13 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri } /** + * @return Topology locked flag. + */ + public boolean topologyLocked() { + return topLocked; + } + + /** * @return Cache write synchronization mode. */ public CacheWriteSynchronizationMode writeSynchronizationMode() { @@ -664,18 +676,24 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri writer.incrementState(); case 20: - if (!writer.writeMessage("topVer", topVer)) + if (!writer.writeBoolean("topLocked", topLocked)) return false; writer.incrementState(); case 21: - if (!writer.writeMessage("updateVer", updateVer)) + if (!writer.writeMessage("topVer", topVer)) return false; writer.incrementState(); case 22: + if (!writer.writeMessage("updateVer", updateVer)) + return false; + + writer.incrementState(); + + case 23: if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG)) return false; @@ -842,7 +860,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); case 20: - topVer = reader.readMessage("topVer"); + topLocked = reader.readBoolean("topLocked"); if (!reader.isLastRead()) return false; @@ -850,7 +868,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); case 21: - updateVer = reader.readMessage("updateVer"); + topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) return false; @@ -858,6 +876,14 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); case 22: + updateVer = reader.readMessage("updateVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 23: vals = reader.readCollection("vals", MessageCollectionItemType.MSG); if (!reader.isLastRead()) @@ -877,7 +903,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 23; + return 24; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dcda61b4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java index 5b74b31..6292f2d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java @@ -292,7 +292,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity false, false); - cand.topologyVersion(new AffinityTopologyVersion(topVer.get().topologyVersion())); + cand.topologyVersion(topVer.get()); } } else { @@ -311,7 +311,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity false, false); - cand.topologyVersion(new AffinityTopologyVersion(topVer.get().topologyVersion())); + cand.topologyVersion(topVer.get()); } else cand = cand.reenter(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dcda61b4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index c494602..874e640 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -639,6 +639,30 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { } /** + * @return Any transaction associated with the current thread. + */ + public IgniteInternalTx anyActiveThreadTx() { + long threadId = Thread.currentThread().getId(); + + IgniteInternalTx tx = threadMap.get(threadId); + + if (tx != null && tx.topologyVersionSnapshot() != null) + return tx; + + for (GridCacheContext cacheCtx : cctx.cache().context().cacheContexts()) { + if (!cacheCtx.systemTx()) + continue; + + tx = sysThreadMap.get(new TxThreadKey(threadId, cacheCtx.cacheId())); + + if (tx != null && tx.topologyVersionSnapshot() != null) + return tx; + } + + return null; + } + + /** * @return Local transaction. */ @Nullable public IgniteInternalTx localTxx() {