# ignite-23
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/e4e54bad Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/e4e54bad Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/e4e54bad Branch: refs/heads/ignite-929 Commit: e4e54bad1481a64e6b270231158d2d299441eb9d Parents: ff17caf Author: sboikov <sboi...@gridgain.com> Authored: Mon May 25 10:00:46 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Mon May 25 11:12:32 2015 +0300 ---------------------------------------------------------------------- .../dht/atomic/GridDhtAtomicCache.java | 8 +- .../dht/atomic/GridDhtAtomicUpdateFuture.java | 10 +- .../dht/atomic/GridNearAtomicUpdateFuture.java | 6 +- .../dht/atomic/GridNearAtomicUpdateRequest.java | 112 ++++++----- .../colocated/GridDhtColocatedLockFuture.java | 3 + ...niteCacheClientNodeChangingTopologyTest.java | 184 +++++++++++++++++-- 6 files changed, 258 insertions(+), 65 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e4e54bad/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 7fe847a..c6a2bf7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -1023,8 +1023,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { IgniteCacheExpiryPolicy expiry = null; - boolean clientReq = false; - try { // If batch store update is enabled, we need to lock all entries. // First, need to acquire locks on cache entries, then check filter. @@ -1052,13 +1050,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { return; } - clientReq = CU.clientNode(node); - // Do not check topology version for CLOCK versioning since // partition exchange will wait for near update future. // Also do not check topology version if topology was locked on near node by // external transaction or explicit lock. - if ((req.fastMap() && !clientReq) || req.topologyLocked() || + if ((req.fastMap() && !req.clientRequest()) || req.topologyLocked() || !needRemap(req.topologyVersion(), topology().topologyVersion(), req.keys())) { boolean hasNear = ctx.discovery().cacheNearNode(node, name()); @@ -1161,7 +1157,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } } catch (GridDhtInvalidPartitionException ignore) { - assert !req.fastMap() || clientReq : req; + assert !req.fastMap() || req.clientRequest() : req; if (log.isDebugEnabled()) log.debug("Caught invalid partition exception for cache entry (will remap update request): " + req); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e4e54bad/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java index 40ab104..ff8454e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java @@ -86,6 +86,9 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> /** Future keys. */ private Collection<KeyCacheObject> keys; + /** */ + private boolean waitForExchange; + /** * @param cctx Cache context. * @param completionCb Callback to invoke when future is completed. @@ -113,6 +116,10 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> log = U.logger(cctx.kernalContext(), logRef, GridDhtAtomicUpdateFuture.class); keys = new ArrayList<>(updateReq.keys().size()); + + boolean topLocked = updateReq.topologyLocked() || (updateReq.fastMap() && !updateReq.clientRequest()); + + waitForExchange = !topLocked; } /** {@inheritDoc} */ @@ -164,8 +171,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> /** {@inheritDoc} */ @Override public boolean waitForPartitionExchange() { - // Wait dht update futures in PRIMARY mode. - return cctx.config().getAtomicWriteOrderMode() == CacheAtomicWriteOrderMode.PRIMARY; + return waitForExchange; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e4e54bad/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index 82659ca..50c3d56 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -614,7 +614,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> filter, subjId, taskNameHash, - skipStore); + skipStore, + cctx.kernalContext().clientNode()); req.addUpdateEntry(cacheKey, val, @@ -755,7 +756,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> filter, subjId, taskNameHash, - skipStore); + skipStore, + cctx.kernalContext().clientNode()); pendingMappings.put(nodeId, mapped); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e4e54bad/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java index a96a666..86c5ab8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java @@ -132,6 +132,9 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri /** Skip write-through to a persistent storage. */ private boolean skipStore; + /** */ + private boolean clientReq; + /** * Empty constructor required by {@link Externalizable}. */ @@ -148,6 +151,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri * @param fastMap Fast map scheme flag. * @param updateVer Update version set if fast map is performed. * @param topVer Topology version. + * @param topLocked Topology locked flag. * @param syncMode Synchronization mode. * @param op Cache update operation. * @param retval Return value required flag. @@ -157,6 +161,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri * @param subjId Subject ID. * @param taskNameHash Task name hash code. * @param skipStore Skip write-through to a persistent storage. + * @param clientReq Client node request flag. */ public GridNearAtomicUpdateRequest( int cacheId, @@ -174,7 +179,8 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri @Nullable CacheEntryPredicate[] filter, @Nullable UUID subjId, int taskNameHash, - boolean skipStore + boolean skipStore, + boolean clientReq ) { this.cacheId = cacheId; this.nodeId = nodeId; @@ -193,6 +199,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri this.subjId = subjId; this.taskNameHash = taskNameHash; this.skipStore = skipStore; + this.clientReq = clientReq; keys = new ArrayList<>(); } @@ -266,6 +273,13 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri } /** + * @return {@code True} if request sent from client node. + */ + public boolean clientRequest() { + return clientReq; + } + + /** * @return Cache write synchronization mode. */ public CacheWriteSynchronizationMode writeSynchronizationMode() { @@ -574,126 +588,132 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri switch (writer.state()) { case 3: - if (!writer.writeMessage("conflictExpireTimes", conflictExpireTimes)) + if (!writer.writeBoolean("clientReq", clientReq)) return false; writer.incrementState(); case 4: - if (!writer.writeMessage("conflictTtls", conflictTtls)) + if (!writer.writeMessage("conflictExpireTimes", conflictExpireTimes)) return false; writer.incrementState(); case 5: - if (!writer.writeCollection("conflictVers", conflictVers, MessageCollectionItemType.MSG)) + if (!writer.writeMessage("conflictTtls", conflictTtls)) return false; writer.incrementState(); case 6: - if (!writer.writeCollection("entryProcessorsBytes", entryProcessorsBytes, MessageCollectionItemType.BYTE_ARR)) + if (!writer.writeCollection("conflictVers", conflictVers, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 7: - if (!writer.writeByteArray("expiryPlcBytes", expiryPlcBytes)) + if (!writer.writeCollection("entryProcessorsBytes", entryProcessorsBytes, MessageCollectionItemType.BYTE_ARR)) return false; writer.incrementState(); case 8: - if (!writer.writeBoolean("fastMap", fastMap)) + if (!writer.writeByteArray("expiryPlcBytes", expiryPlcBytes)) return false; writer.incrementState(); case 9: - if (!writer.writeObjectArray("filter", filter, MessageCollectionItemType.MSG)) + if (!writer.writeBoolean("fastMap", fastMap)) return false; writer.incrementState(); case 10: - if (!writer.writeMessage("futVer", futVer)) + if (!writer.writeObjectArray("filter", filter, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 11: - if (!writer.writeBoolean("hasPrimary", hasPrimary)) + if (!writer.writeMessage("futVer", futVer)) return false; writer.incrementState(); case 12: - if (!writer.writeObjectArray("invokeArgsBytes", invokeArgsBytes, MessageCollectionItemType.BYTE_ARR)) + if (!writer.writeBoolean("hasPrimary", hasPrimary)) return false; writer.incrementState(); case 13: - if (!writer.writeCollection("keys", keys, MessageCollectionItemType.MSG)) + if (!writer.writeObjectArray("invokeArgsBytes", invokeArgsBytes, MessageCollectionItemType.BYTE_ARR)) return false; writer.incrementState(); case 14: - if (!writer.writeByte("op", op != null ? (byte)op.ordinal() : -1)) + if (!writer.writeCollection("keys", keys, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 15: - if (!writer.writeBoolean("retval", retval)) + if (!writer.writeByte("op", op != null ? (byte)op.ordinal() : -1)) return false; writer.incrementState(); case 16: - if (!writer.writeBoolean("skipStore", skipStore)) + if (!writer.writeBoolean("retval", retval)) return false; writer.incrementState(); case 17: - if (!writer.writeUuid("subjId", subjId)) + if (!writer.writeBoolean("skipStore", skipStore)) return false; writer.incrementState(); case 18: - if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1)) + if (!writer.writeUuid("subjId", subjId)) return false; writer.incrementState(); case 19: - if (!writer.writeInt("taskNameHash", taskNameHash)) + if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1)) return false; writer.incrementState(); case 20: - if (!writer.writeBoolean("topLocked", topLocked)) + if (!writer.writeInt("taskNameHash", taskNameHash)) return false; writer.incrementState(); case 21: - if (!writer.writeMessage("topVer", topVer)) + if (!writer.writeBoolean("topLocked", topLocked)) return false; writer.incrementState(); case 22: - if (!writer.writeMessage("updateVer", updateVer)) + if (!writer.writeMessage("topVer", topVer)) return false; writer.incrementState(); case 23: + if (!writer.writeMessage("updateVer", updateVer)) + return false; + + writer.incrementState(); + + case 24: if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG)) return false; @@ -716,7 +736,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri switch (reader.state()) { case 3: - conflictExpireTimes = reader.readMessage("conflictExpireTimes"); + clientReq = reader.readBoolean("clientReq"); if (!reader.isLastRead()) return false; @@ -724,7 +744,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); case 4: - conflictTtls = reader.readMessage("conflictTtls"); + conflictExpireTimes = reader.readMessage("conflictExpireTimes"); if (!reader.isLastRead()) return false; @@ -732,7 +752,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); case 5: - conflictVers = reader.readCollection("conflictVers", MessageCollectionItemType.MSG); + conflictTtls = reader.readMessage("conflictTtls"); if (!reader.isLastRead()) return false; @@ -740,7 +760,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); case 6: - entryProcessorsBytes = reader.readCollection("entryProcessorsBytes", MessageCollectionItemType.BYTE_ARR); + conflictVers = reader.readCollection("conflictVers", MessageCollectionItemType.MSG); if (!reader.isLastRead()) return false; @@ -748,7 +768,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); case 7: - expiryPlcBytes = reader.readByteArray("expiryPlcBytes"); + entryProcessorsBytes = reader.readCollection("entryProcessorsBytes", MessageCollectionItemType.BYTE_ARR); if (!reader.isLastRead()) return false; @@ -756,7 +776,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); case 8: - fastMap = reader.readBoolean("fastMap"); + expiryPlcBytes = reader.readByteArray("expiryPlcBytes"); if (!reader.isLastRead()) return false; @@ -764,7 +784,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); case 9: - filter = reader.readObjectArray("filter", MessageCollectionItemType.MSG, CacheEntryPredicate.class); + fastMap = reader.readBoolean("fastMap"); if (!reader.isLastRead()) return false; @@ -772,7 +792,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); case 10: - futVer = reader.readMessage("futVer"); + filter = reader.readObjectArray("filter", MessageCollectionItemType.MSG, CacheEntryPredicate.class); if (!reader.isLastRead()) return false; @@ -780,7 +800,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); case 11: - hasPrimary = reader.readBoolean("hasPrimary"); + futVer = reader.readMessage("futVer"); if (!reader.isLastRead()) return false; @@ -788,7 +808,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); case 12: - invokeArgsBytes = reader.readObjectArray("invokeArgsBytes", MessageCollectionItemType.BYTE_ARR, byte[].class); + hasPrimary = reader.readBoolean("hasPrimary"); if (!reader.isLastRead()) return false; @@ -796,7 +816,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); case 13: - keys = reader.readCollection("keys", MessageCollectionItemType.MSG); + invokeArgsBytes = reader.readObjectArray("invokeArgsBytes", MessageCollectionItemType.BYTE_ARR, byte[].class); if (!reader.isLastRead()) return false; @@ -804,6 +824,14 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); case 14: + keys = reader.readCollection("keys", MessageCollectionItemType.MSG); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 15: byte opOrd; opOrd = reader.readByte("op"); @@ -815,7 +843,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); - case 15: + case 16: retval = reader.readBoolean("retval"); if (!reader.isLastRead()) @@ -823,7 +851,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); - case 16: + case 17: skipStore = reader.readBoolean("skipStore"); if (!reader.isLastRead()) @@ -831,7 +859,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); - case 17: + case 18: subjId = reader.readUuid("subjId"); if (!reader.isLastRead()) @@ -839,7 +867,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); - case 18: + case 19: byte syncModeOrd; syncModeOrd = reader.readByte("syncMode"); @@ -851,7 +879,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); - case 19: + case 20: taskNameHash = reader.readInt("taskNameHash"); if (!reader.isLastRead()) @@ -859,7 +887,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); - case 20: + case 21: topLocked = reader.readBoolean("topLocked"); if (!reader.isLastRead()) @@ -867,7 +895,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); - case 21: + case 22: topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) @@ -875,7 +903,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); - case 22: + case 23: updateVer = reader.readMessage("updateVer"); if (!reader.isLastRead()) @@ -883,7 +911,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); - case 23: + case 24: vals = reader.readCollection("vals", MessageCollectionItemType.MSG); if (!reader.isLastRead()) @@ -903,7 +931,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 24; + return 25; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e4e54bad/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java index 500495a..cc8f064 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java @@ -341,6 +341,9 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture else if (log.isDebugEnabled()) log.debug("Transaction was not marked rollback-only while locks were not acquired: " + tx); } + + for (KeyCacheObject key : GridDhtColocatedLockFuture.this.keys) + cctx.mvcc().removeExplicitLock(threadId, key, lockVer); } cctx.mvcc().recheckPendingLocks(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e4e54bad/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java index c01ef6f..4603aaf 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java @@ -24,8 +24,11 @@ import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.managers.communication.*; +import org.apache.ignite.internal.processors.affinity.*; +import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.*; import org.apache.ignite.internal.processors.cache.distributed.near.*; +import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.plugin.extensions.communication.*; @@ -38,6 +41,7 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; import org.apache.ignite.testframework.*; import org.apache.ignite.testframework.junits.common.*; import org.apache.ignite.transactions.*; +import org.eclipse.jetty.util.*; import org.jetbrains.annotations.*; import javax.cache.*; @@ -96,21 +100,53 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac * @throws Exception If failed. */ public void testAtomicPutAllClockMode() throws Exception { - atomicPutAll(CLOCK); + atomicPut(CLOCK, true, null); } /** * @throws Exception If failed. */ public void testAtomicPutAllPrimaryMode() throws Exception { - atomicPutAll(PRIMARY); + atomicPut(PRIMARY, true, null); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicPutAllNearEnabledClockMode() throws Exception { + atomicPut(CLOCK, true, new NearCacheConfiguration()); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicPutAllNearEnabledPrimaryMode() throws Exception { + atomicPut(PRIMARY, true, new NearCacheConfiguration()); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicPutClockMode() throws Exception { + atomicPut(CLOCK, false, null); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicPutPrimaryMode() throws Exception { + atomicPut(PRIMARY, false, null); } /** * @param writeOrder Write order. + * @param putAll If {@code true} executes putAll. + * @param nearCfg Near cache configuration. * @throws Exception If failed. */ - private void atomicPutAll(CacheAtomicWriteOrderMode writeOrder) throws Exception { + private void atomicPut(CacheAtomicWriteOrderMode writeOrder, + final boolean putAll, + @Nullable NearCacheConfiguration nearCfg) throws Exception { ccfg = new CacheConfiguration(); ccfg.setCacheMode(PARTITIONED); @@ -123,15 +159,21 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac IgniteEx ignite0 = startGrid(0); IgniteEx ignite1 = startGrid(1); + ccfg.setNearConfiguration(nearCfg); + client = true; + ccfg.setNearConfiguration(null); + Ignite ignite2 = startGrid(2); assertTrue(ignite2.configuration().isClientMode()); final Map<Integer, Integer> map = new HashMap<>(); - for (int i = 0; i < 100; i++) + final int KEYS = putAll ? 100 : 1; + + for (int i = 0; i < KEYS; i++) map.put(i, i); TestCommunicationSpi spi = (TestCommunicationSpi)ignite2.configuration().getCommunicationSpi(); @@ -148,7 +190,10 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac @Override public Object call() throws Exception { Thread.currentThread().setName("put-thread"); - cache.putAll(map); + if (putAll) + cache.putAll(map); + else + cache.put(0, 0); return null; } @@ -172,7 +217,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac map.clear(); - for (int i = 0; i < 100; i++) + for (int i = 0; i < KEYS; i++) map.put(i, i + 1); // Block messages requests for single node. @@ -182,7 +227,10 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac @Override public Object call() throws Exception { Thread.currentThread().setName("put-thread"); - cache.putAll(map); + if (putAll) + cache.putAll(map); + else + cache.put(0, 1); return null; } @@ -202,10 +250,13 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac checkData(map, cache, 4); - for (int i = 0; i < 100; i++) + for (int i = 0; i < KEYS; i++) map.put(i, i + 2); - cache.putAll(map); + if (putAll) + cache.putAll(map); + else + cache.put(0, 2); checkData(map, cache, 4); } @@ -401,7 +452,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac /** * @throws Exception If failed. */ - public void _testLock() throws Exception { + public void testLock() throws Exception { lock(null); } @@ -740,6 +791,72 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac } /** + * @throws Exception If failed. + */ + public void testLockFromClientBlocksExchange() throws Exception { + ccfg = new CacheConfiguration(); + + ccfg.setCacheMode(PARTITIONED); + ccfg.setBackups(1); + ccfg.setAtomicityMode(TRANSACTIONAL); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setRebalanceMode(SYNC); + + startGrid(0); + startGrid(1); + + client = true; + + Ignite ignite2 = startGrid(2); + + IgniteCache<Integer, Integer> cache = ignite2.cache(null); + + Lock lock = cache.lock(0); + + lock.lock(); + + IgniteInternalFuture<?> startFut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + client = false; + + startGrid(3); + + return null; + } + }); + + U.sleep(2000); + + assertFalse(startFut.isDone()); + + AffinityTopologyVersion ver = new AffinityTopologyVersion(4); + + List<IgniteInternalFuture<?>> futs = new ArrayList<>(); + + U.sleep(2000); + + for (int i = 0; i < 3; i++) { + Ignite ignite = ignite(i); + + IgniteInternalFuture<?> fut = + ((IgniteKernal)ignite).context().cache().context().exchange().affinityReadyFuture(ver); + + assertNotNull(fut); + + assertFalse(fut.isDone()); + + futs.add(fut); + } + + lock.unlock(); + + for (IgniteInternalFuture<?> fut : futs) + fut.get(10_000); + + startFut.get(10_000); + } + + /** * @param map Expected data. * @param clientCache Client cache. * @param expNodes Expected nodes number. @@ -764,11 +881,35 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac for (Map.Entry<Integer, Integer> e : map.entrySet()) { Integer key = e.getKey(); + GridCacheVersion ver = null; + for (Ignite node : nodes) { IgniteCache<Integer, Integer> cache = node.cache(null); - if (aff.isPrimaryOrBackup(node.cluster().localNode(), key) || node == nearCacheNode) + boolean affNode = aff.isPrimaryOrBackup(node.cluster().localNode(), key); + + if (affNode || node == nearCacheNode) { assertEquals("Unexpected value for " + node.name(), e.getValue(), cache.localPeek(key)); + + GridCacheAdapter cache0 = ((IgniteKernal)node).internalCache(null); + + if (affNode && cache0.isNear()) + cache0 = ((GridNearCacheAdapter)cache0).dht(); + + GridCacheEntryEx entry = cache0.peekEx(key); + + assertNotNull("No entry [node=" + node.name() + ", key=" + key + ']', entry); + + GridCacheVersion ver0 = entry instanceof GridNearCacheEntry ? + ((GridNearCacheEntry)entry).dhtVersion() : entry.version(); + + assertNotNull("Null version [node=" + node.name() + ", key=" + key + ']', ver0); + + if (ver == null) + ver = ver0; + else + assertEquals(ver0, ver); + } else assertNull("Unexpected non-null value for " + node.name(), cache.localPeek(key)); } @@ -779,6 +920,9 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac return false; } + catch (Exception e) { + fail("Unexpected exception: " + e); + } return true; } @@ -860,6 +1004,8 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac final int THREADS = CLIENT_CNT * 3; + final ConcurrentHashSet<Integer> putKeys = new ConcurrentHashSet<>(); + try { GridTestUtils.runMultiThreadedAsync(new Callable<Object>() { @Override public Object call() throws Exception { @@ -887,8 +1033,11 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac while (!stop.get()) { TreeMap<Integer, Integer> map = new TreeMap<>(); - for (int i = 0; i < 100; i++) - map.put(rnd.nextInt(0, 1000), i); + for (int i = 0; i < 100; i++) { + Integer key = rnd.nextInt(0, 1000); + + map.put(key, key); + } try { if (useTx) { @@ -904,6 +1053,8 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac } else cache.putAll(map); + + putKeys.addAll(map.keySet()); } catch (CacheException | IgniteException e) { log.info("Update failed, ignore: " + e); @@ -1002,6 +1153,13 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac for (IgniteInternalFuture<?> fut : futs) fut.get(); + + Map<Integer, Integer> map = new HashMap<>(); + + for (Integer key : putKeys) + map.put(key, key); + + checkData(map, grid(SRV_CNT).cache(null), SRV_CNT + CLIENT_CNT); } /**