# send previous value for atomic cache updates on unstable topology
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/f0b24c47 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/f0b24c47 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/f0b24c47 Branch: refs/heads/ignite-426 Commit: f0b24c47a6b5a449a63ca5cf8fdc85811b4cc278 Parents: 4c634ed Author: sboikov <sboi...@gridgain.com> Authored: Mon Aug 17 10:22:14 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Mon Aug 17 13:54:16 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheEntryEx.java | 5 +- .../processors/cache/GridCacheMapEntry.java | 29 +- .../dht/GridClientPartitionTopology.java | 7 + .../dht/GridDhtPartitionTopology.java | 6 + .../dht/GridDhtPartitionTopologyImpl.java | 45 ++- .../dht/atomic/GridDhtAtomicCache.java | 63 +++-- .../dht/atomic/GridDhtAtomicUpdateFuture.java | 8 +- .../dht/atomic/GridDhtAtomicUpdateRequest.java | 69 ++++- .../dht/atomic/GridNearAtomicUpdateFuture.java | 5 +- .../distributed/near/GridNearAtomicCache.java | 6 +- .../processors/cache/GridCacheTestEntryEx.java | 3 +- ...acheContinuousQueryFailoverAbstractTest.java | 271 ++++++++++++++++--- 12 files changed, 432 insertions(+), 85 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f0b24c47/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java index 1b5a717..88ebd48 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java @@ -407,6 +407,7 @@ public interface GridCacheEntryEx { * @param primary If update is performed on primary node (the one which assigns version). * @param checkVer Whether update should check current version and ignore update if current version is * greater than passed in. + * @param topVer Topology version. * @param filter Optional filter to check. * @param drType DR type. * @param conflictTtl Conflict TTL (if any). @@ -416,6 +417,7 @@ public interface GridCacheEntryEx { * @param intercept If {@code true} then calls cache interceptor. * @param subjId Subject ID initiated this update. * @param taskName Task name. + * @param prevVal Previous value. * @return Tuple where first value is flag showing whether operation succeeded, * second value is old entry value if return value is requested, third is updated entry value, * fourth is the version to enqueue for deferred delete the fifth is DR conflict context @@ -448,7 +450,8 @@ public interface GridCacheEntryEx { boolean conflictResolve, boolean intercept, @Nullable UUID subjId, - String taskName + String taskName, + @Nullable CacheObject prevVal ) throws IgniteCheckedException, GridCacheEntryRemovedException; /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f0b24c47/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 283b0b4..e3b25df 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -1601,7 +1601,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme boolean conflictResolve, boolean intercept, @Nullable UUID subjId, - String taskName + String taskName, + @Nullable CacheObject prevVal ) throws IgniteCheckedException, GridCacheEntryRemovedException, GridClosureException { assert cctx.atomic(); @@ -1783,6 +1784,32 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme "[entry=" + this + ", newVer=" + newVer + ']'); } + if (!cctx.isNear()) { + CacheObject evtVal; + + if (op == GridCacheOperation.TRANSFORM) { + EntryProcessor<Object, Object, ?> entryProcessor = + (EntryProcessor<Object, Object, ?>)writeObj; + + CacheInvokeEntry<Object, Object> entry = + new CacheInvokeEntry<>(cctx, key, prevVal, version()); + + try { + entryProcessor.process(entry, invokeArgs); + + evtVal = entry.modified() ? + cctx.toCacheObject(cctx.unwrapTemporary(entry.getValue())) : prevVal; + } + catch (Exception e) { + evtVal = prevVal; + } + } + else + evtVal = (CacheObject)writeObj; + + cctx.continuousQueries().onEntryUpdated(this, key, evtVal, prevVal, primary, false, topVer); + } + return new GridCacheUpdateAtomicResult(false, retval ? rawGetOrUnmarshalUnlocked(false) : null, null, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f0b24c47/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java index 5473348..8a7576a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java @@ -824,6 +824,13 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { } /** {@inheritDoc} */ + @Override public boolean rebalanceFinished(AffinityTopologyVersion topVer) { + assert false; + + return false; + } + + /** {@inheritDoc} */ @Override public void printMemoryStats(int threshold) { X.println(">>> Cache partition topology stats [grid=" + cctx.gridName() + ", cacheId=" + cacheId + ']'); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f0b24c47/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java index 2d9771f..9933444 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java @@ -228,4 +228,10 @@ public interface GridDhtPartitionTopology { * @param threshold Threshold for number of entries. */ public void printMemoryStats(int threshold); + + /** + * @param topVer Topology version. + * @return {@code True} if rebalance process finished. + */ + public boolean rebalanceFinished(AffinityTopologyVersion topVer); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f0b24c47/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 93f085c..1c71ff1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -85,6 +85,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { /** Partition update counter. */ private Map<Integer, Long> cntrMap = new HashMap<>(); + /** */ + private volatile AffinityTopologyVersion rebalancedTopVer = AffinityTopologyVersion.NONE; + /** * @param cctx Context. */ @@ -114,6 +117,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { topReadyFut = null; topVer = AffinityTopologyVersion.NONE; + + rebalancedTopVer = AffinityTopologyVersion.NONE; } finally { lock.writeLock().unlock(); @@ -203,6 +208,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { updateSeq.setIfGreater(updSeq); topReadyFut = exchFut; + + rebalancedTopVer = AffinityTopologyVersion.NONE;; } finally { lock.writeLock().unlock(); @@ -508,6 +515,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } } + updateRebalanceVersion(); + consistencyCheck(); } finally { @@ -690,7 +699,10 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { * @param states Additional partition states. * @return List of nodes for the partition. */ - private List<ClusterNode> nodes(int p, AffinityTopologyVersion topVer, GridDhtPartitionState state, GridDhtPartitionState... states) { + private List<ClusterNode> nodes(int p, + AffinityTopologyVersion topVer, + GridDhtPartitionState state, + GridDhtPartitionState... states) { Collection<UUID> allIds = topVer.topologyVersion() > 0 ? F.nodeIds(CU.affinityNodes(cctx, topVer)) : null; lock.readLock().lock(); @@ -888,6 +900,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { boolean changed = checkEvictions(updateSeq); + updateRebalanceVersion(); + consistencyCheck(); if (log.isDebugEnabled()) @@ -1000,6 +1014,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { changed |= checkEvictions(updateSeq); + updateRebalanceVersion(); + consistencyCheck(); if (log.isDebugEnabled()) @@ -1196,6 +1212,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (part.own()) { updateLocal(part.id(), loc.id(), part.state(), updateSeq.incrementAndGet()); + updateRebalanceVersion(); + consistencyCheck(); return true; @@ -1268,6 +1286,11 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } /** {@inheritDoc} */ + @Override public boolean rebalanceFinished(AffinityTopologyVersion topVer) { + return topVer.equals(rebalancedTopVer); + } + + /** {@inheritDoc} */ @Override public void printMemoryStats(int threshold) { X.println(">>> Cache partition topology stats [grid=" + cctx.gridName() + ", cache=" + cctx.name() + ']'); @@ -1280,6 +1303,26 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } /** + * + */ + private void updateRebalanceVersion() { + if (!rebalancedTopVer.equals(topVer)) { + for (int i = 0; i < cctx.affinity().partitions(); i++) { + List<ClusterNode> affNodes = cctx.affinity().nodes(i, topVer); + List<ClusterNode> owners = owners(i); + + if (affNodes.size() != owners.size() || !owners.containsAll(affNodes)) + return; + } + + rebalancedTopVer = topVer; + + if (log.isDebugEnabled()) + log.debug("Updated rebalanced version [cache=" + cctx.name() + ", ver=" + rebalancedTopVer + ']'); + } + } + + /** * @param p Partition. * @param nodeId Node ID. * @param match State to match. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f0b24c47/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index eb4d51c..6c05bfe 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -1057,7 +1057,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted = null; try { - topology().readLock(); + GridDhtPartitionTopology top = topology(); + + top.readLock(); try { if (topology().stopping()) { @@ -1074,7 +1076,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { // Also do not check topology version if topology was locked on near node by // external transaction or explicit lock. if ((req.fastMap() && !req.clientRequest()) || req.topologyLocked() || - !needRemap(req.topologyVersion(), topology().topologyVersion())) { + !needRemap(req.topologyVersion(), top.topologyVersion())) { ClusterNode node = ctx.discovery().node(nodeId); if (node == null) { @@ -1089,7 +1091,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (ver == null) { // Assign next version for update inside entries lock. - ver = ctx.versions().next(topology().topologyVersion()); + ver = ctx.versions().next(top.topologyVersion()); if (hasNear) res.nearVersion(ver); @@ -1101,6 +1103,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { log.debug("Using cache version for update request on primary node [ver=" + ver + ", req=" + req + ']'); + boolean sndPrevVal = !top.rebalanceFinished(req.topologyVersion()); + dhtFut = createDhtFuture(ver, req, res, completionCb, false); expiry = expiryPolicy(req.expiry()); @@ -1123,7 +1127,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { completionCb, ctx.isDrEnabled(), taskName, - expiry); + expiry, + sndPrevVal); deleted = updRes.deleted(); dhtFut = updRes.dhtFuture(); @@ -1142,7 +1147,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { completionCb, ctx.isDrEnabled(), taskName, - expiry); + expiry, + sndPrevVal); retVal = updRes.returnValue(); deleted = updRes.deleted(); @@ -1162,7 +1168,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { remap = true; } finally { - topology().readUnlock(); + top.readUnlock(); } } catch (GridCacheEntryRemovedException e) { @@ -1245,6 +1251,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param replicate Whether replication is enabled. * @param taskName Task name. * @param expiry Expiry policy. + * @param sndPrevVal If {@code true} sends previous value to backups. * @return Deleted entries. * @throws GridCacheEntryRemovedException Should not be thrown. */ @@ -1260,7 +1267,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb, boolean replicate, String taskName, - @Nullable IgniteCacheExpiryPolicy expiry + @Nullable IgniteCacheExpiryPolicy expiry, + boolean sndPrevVal ) throws GridCacheEntryRemovedException { assert !ctx.dr().receiveEnabled(); // Cannot update in batches during DR due to possible conflicts. assert !req.returnValue() || req.operation() == TRANSFORM; // Should not request return values for putAll. @@ -1407,7 +1415,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { replicate, updRes, taskName, - expiry); + expiry, + sndPrevVal); firstEntryIdx = i; @@ -1455,7 +1464,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { replicate, updRes, taskName, - expiry); + expiry, + sndPrevVal); firstEntryIdx = i; @@ -1574,7 +1584,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { replicate, updRes, taskName, - expiry); + expiry, + sndPrevVal); } else assert filtered.isEmpty(); @@ -1650,6 +1661,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param replicate Whether DR is enabled for that cache. * @param taskName Task name. * @param expiry Expiry policy. + * @param sndPrevVal If {@code true} sends previous value to backups. * @return Return value. * @throws GridCacheEntryRemovedException Should be never thrown. */ @@ -1664,7 +1676,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb, boolean replicate, String taskName, - @Nullable IgniteCacheExpiryPolicy expiry + @Nullable IgniteCacheExpiryPolicy expiry, + boolean sndPrevVal ) throws GridCacheEntryRemovedException { GridCacheReturn retVal = null; Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted = null; @@ -1721,7 +1734,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { req.invokeArguments(), primary && writeThrough() && !req.skipStore(), !req.skipStore(), - req.returnValue(), + sndPrevVal || req.returnValue(), expiry, true, true, @@ -1736,7 +1749,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { true, intercept, req.subjectId(), - taskName); + taskName, + null); if (dhtFut == null && !F.isEmpty(filteredReaders)) { dhtFut = createDhtFuture(ver, req, res, completionCb, true); @@ -1759,7 +1773,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { op == TRANSFORM ? req.entryProcessor(i) : null, updRes.newTtl(), updRes.conflictExpireTime(), - newConflictVer); + newConflictVer, + sndPrevVal, + updRes.oldValue()); } if (!F.isEmpty(filteredReaders)) @@ -1865,6 +1881,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param batchRes Batch update result. * @param taskName Task name. * @param expiry Expiry policy. + * @param sndPrevVal If {@code true} sends previous value to backups. * @return Deleted entries. */ @SuppressWarnings("ForLoopReplaceableByForEach") @@ -1885,7 +1902,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { boolean replicate, UpdateBatchResult batchRes, String taskName, - @Nullable IgniteCacheExpiryPolicy expiry + @Nullable IgniteCacheExpiryPolicy expiry, + boolean sndPrevVal ) { assert putMap == null ^ rmvKeys == null; @@ -1987,7 +2005,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { null, /*write-through*/false, /*read-through*/false, - /*retval*/false, + /*retval*/sndPrevVal, expiry, /*event*/true, /*metrics*/true, @@ -2002,7 +2020,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /*conflict resolve*/false, /*intercept*/false, req.subjectId(), - taskName); + taskName, + null); assert !updRes.success() || updRes.newTtl() == CU.TTL_NOT_CHANGED || expiry != null : "success=" + updRes.success() + ", newTtl=" + updRes.newTtl() + ", expiry=" + expiry; @@ -2038,7 +2057,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { entryProcessorMap == null ? null : entryProcessorMap.get(entry.key()), updRes.newTtl(), CU.EXPIRE_TIME_CALCULATE, - null); + null, + sndPrevVal, + updRes.oldValue()); if (!F.isEmpty(filteredReaders)) dhtFut.addNearWriteEntries(filteredReaders, @@ -2423,7 +2444,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { processDhtAtomicUpdateRequest0(nodeId, req); else { fut.listen(new CI1<IgniteInternalFuture>() { - @Override public void apply(IgniteInternalFuture future) { + @Override public void apply(IgniteInternalFuture fut) { processDhtAtomicUpdateRequest0(nodeId, req); } }); @@ -2461,6 +2482,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { entry = entryExx(key); CacheObject val = req.value(i); + CacheObject prevVal = req.previousValue(i); EntryProcessor<Object, Object, Object> entryProcessor = req.entryProcessor(i); GridCacheOperation op = entryProcessor != null ? TRANSFORM : @@ -2493,7 +2515,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { false, intercept, req.subjectId(), - taskName); + taskName, + prevVal); if (updRes.removeVersion() != null) { if (ctx.deferredDelete()) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f0b24c47/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java index 601f1d8..d983e88 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java @@ -210,7 +210,9 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> EntryProcessor<Object, Object, Object> entryProcessor, long ttl, long conflictExpireTime, - @Nullable GridCacheVersion conflictVer) { + @Nullable GridCacheVersion conflictVer, + boolean addPrevVal, + @Nullable CacheObject prevVal) { AffinityTopologyVersion topVer = updateReq.topologyVersion(); int part = entry.partition(); @@ -254,7 +256,9 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> entryProcessor, ttl, conflictExpireTime, - conflictVer); + conflictVer, + addPrevVal, + prevVal); } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f0b24c47/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java index 6340c93..2f92fde 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java @@ -71,6 +71,11 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid @GridDirectCollection(CacheObject.class) private List<CacheObject> vals; + /** Previous values. */ + @GridToStringInclude + @GridDirectCollection(CacheObject.class) + private List<CacheObject> prevVals; + /** Conflict versions. */ @GridDirectCollection(GridCacheVersion.class) private List<GridCacheVersion> conflictVers; @@ -208,13 +213,17 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid * @param ttl TTL (optional). * @param conflictExpireTime Conflict expire time (optional). * @param conflictVer Conflict version (optional). + * @param addPrevVal If {@code true} adds previous value. + * @param prevVal Previous value. */ public void addWriteValue(KeyCacheObject key, @Nullable CacheObject val, EntryProcessor<Object, Object, Object> entryProcessor, long ttl, long conflictExpireTime, - @Nullable GridCacheVersion conflictVer) { + @Nullable GridCacheVersion conflictVer, + boolean addPrevVal, + @Nullable CacheObject prevVal) { keys.add(key); if (forceTransformBackups) { @@ -225,6 +234,13 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid else vals.add(val); + if (addPrevVal) { + if (prevVals == null) + prevVals = new ArrayList<>(); + + prevVals.add(prevVal); + } + // In case there is no conflict, do not create the list. if (conflictVer != null) { if (conflictVers == null) { @@ -419,6 +435,17 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid /** * @param idx Key index. + * @return Value. + */ + @Nullable public CacheObject previousValue(int idx) { + if (prevVals != null) + return prevVals.get(idx); + + return null; + } + + /** + * @param idx Key index. * @return Entry processor. */ @Nullable public EntryProcessor<Object, Object, Object> entryProcessor(int idx) { @@ -670,42 +697,48 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid writer.incrementState(); case 17: - if (!writer.writeUuid("subjId", subjId)) + if (!writer.writeCollection("prevVals", prevVals, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 18: - if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1)) + if (!writer.writeUuid("subjId", subjId)) return false; writer.incrementState(); case 19: - if (!writer.writeInt("taskNameHash", taskNameHash)) + if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1)) return false; writer.incrementState(); case 20: - if (!writer.writeMessage("topVer", topVer)) + if (!writer.writeInt("taskNameHash", taskNameHash)) return false; writer.incrementState(); case 21: - if (!writer.writeMessage("ttls", ttls)) + if (!writer.writeMessage("topVer", topVer)) return false; writer.incrementState(); case 22: - if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG)) + if (!writer.writeMessage("ttls", ttls)) return false; writer.incrementState(); case 23: + if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG)) + return false; + + writer.incrementState(); + + case 24: if (!writer.writeMessage("writeVer", writeVer)) return false; @@ -840,7 +873,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid reader.incrementState(); case 17: - subjId = reader.readUuid("subjId"); + prevVals = reader.readCollection("prevVals", MessageCollectionItemType.MSG); if (!reader.isLastRead()) return false; @@ -848,6 +881,14 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid reader.incrementState(); case 18: + subjId = reader.readUuid("subjId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 19: byte syncModeOrd; syncModeOrd = reader.readByte("syncMode"); @@ -859,7 +900,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid reader.incrementState(); - case 19: + case 20: taskNameHash = reader.readInt("taskNameHash"); if (!reader.isLastRead()) @@ -867,7 +908,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid reader.incrementState(); - case 20: + case 21: topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) @@ -875,7 +916,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid reader.incrementState(); - case 21: + case 22: ttls = reader.readMessage("ttls"); if (!reader.isLastRead()) @@ -883,7 +924,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid reader.incrementState(); - case 22: + case 23: vals = reader.readCollection("vals", MessageCollectionItemType.MSG); if (!reader.isLastRead()) @@ -891,7 +932,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid reader.incrementState(); - case 23: + case 24: writeVer = reader.readMessage("writeVer"); if (!reader.isLastRead()) @@ -911,7 +952,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 24; + return 25; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f0b24c47/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index 66f0300..eec7fa0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -129,7 +129,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> private final boolean rawRetval; /** Fast map flag. */ - private final boolean fastMap; + private boolean fastMap; /** */ private boolean fastMapRemap; @@ -696,6 +696,9 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> return; } + if (fastMap && futVer == null) + fastMap = cctx.topology().rebalanceFinished(topVer); + if (futVer == null) // Assign future version in topology read lock before first exception may be thrown. futVer = cctx.versions().next(topVer); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f0b24c47/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java index 2255988..cf68d24 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java @@ -239,7 +239,8 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { false, false, subjId, - taskName); + taskName, + null); if (updRes.removeVersion() != null) { if (ctx.deferredDelete()) @@ -341,7 +342,8 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { false, /*intercept*/false, req.subjectId(), - taskName); + taskName, + null); if (updRes.removeVersion() != null) { if (ctx.deferredDelete()) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f0b24c47/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java index 0055557..0dd10ea 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java @@ -503,7 +503,8 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr boolean conflictResolve, boolean intercept, UUID subjId, - String taskName) throws IgniteCheckedException, + String taskName, + @Nullable CacheObject prevVal) throws IgniteCheckedException, GridCacheEntryRemovedException { assert false; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f0b24c47/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java index e6f3bd7..151ae33 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java @@ -25,9 +25,12 @@ import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.managers.communication.*; +import org.apache.ignite.internal.processors.affinity.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.*; import org.apache.ignite.internal.processors.continuous.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.lang.*; import org.apache.ignite.plugin.extensions.communication.*; import org.apache.ignite.resources.*; @@ -135,8 +138,91 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo /** * @throws Exception If failed. */ + public void testRebalanceVersion() throws Exception { + Ignite ignite0 = startGrid(0); + GridDhtPartitionTopology top0 = ((IgniteKernal)ignite0).context().cache().context().cacheContext(1).topology(); + + assertTrue(top0.rebalanceFinished(new AffinityTopologyVersion(1))); + assertFalse(top0.rebalanceFinished(new AffinityTopologyVersion(2))); + + Ignite ignite1 = startGrid(1); + GridDhtPartitionTopology top1 = ((IgniteKernal)ignite1).context().cache().context().cacheContext(1).topology(); + + waitRebalanceFinished(ignite0, 2); + waitRebalanceFinished(ignite1, 2); + + assertFalse(top0.rebalanceFinished(new AffinityTopologyVersion(3))); + assertFalse(top1.rebalanceFinished(new AffinityTopologyVersion(3))); + + Ignite ignite2 = startGrid(2); + GridDhtPartitionTopology top2 = ((IgniteKernal)ignite2).context().cache().context().cacheContext(1).topology(); + + waitRebalanceFinished(ignite0, 3); + waitRebalanceFinished(ignite1, 3); + waitRebalanceFinished(ignite2, 3); + + assertFalse(top0.rebalanceFinished(new AffinityTopologyVersion(4))); + assertFalse(top1.rebalanceFinished(new AffinityTopologyVersion(4))); + assertFalse(top2.rebalanceFinished(new AffinityTopologyVersion(4))); + + client = true; + + Ignite ignite3 = startGrid(3); + GridDhtPartitionTopology top3 = ((IgniteKernal)ignite3).context().cache().context().cacheContext(1).topology(); + + assertTrue(top0.rebalanceFinished(new AffinityTopologyVersion(4))); + assertTrue(top1.rebalanceFinished(new AffinityTopologyVersion(4))); + assertTrue(top2.rebalanceFinished(new AffinityTopologyVersion(4))); + assertTrue(top3.rebalanceFinished(new AffinityTopologyVersion(4))); + + stopGrid(1); + + waitRebalanceFinished(ignite0, 5); + waitRebalanceFinished(ignite2, 5); + waitRebalanceFinished(ignite3, 5); + + stopGrid(3); + + assertTrue(top0.rebalanceFinished(new AffinityTopologyVersion(6))); + assertTrue(top2.rebalanceFinished(new AffinityTopologyVersion(6))); + + stopGrid(0); + + waitRebalanceFinished(ignite2, 7); + } + + /** + * @param ignite Ignite. + * @param topVer Topology version. + * @throws Exception If failed. + */ + private void waitRebalanceFinished(Ignite ignite, long topVer) throws Exception { + final AffinityTopologyVersion topVer0 = new AffinityTopologyVersion(topVer); + + final GridDhtPartitionTopology top = + ((IgniteKernal)ignite).context().cache().context().cacheContext(1).topology(); + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return top.rebalanceFinished(topVer0); + } + }, 5000); + + assertTrue(top.rebalanceFinished(topVer0)); + } + + /** + * @throws Exception If failed. + */ public void testOneBackup() throws Exception { - checkBackupQueue(1); + checkBackupQueue(1, false); + } + + /** + * @throws Exception If failed. + */ + public void testOneBackupClientUpdate() throws Exception { + checkBackupQueue(1, true); } /** @@ -146,14 +232,15 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo if (cacheMode() == REPLICATED) return; - checkBackupQueue(3); + checkBackupQueue(3, false); } /** * @param backups Number of backups. + * @param updateFromClient If {@code true} executes cache update from client node. * @throws Exception If failed. */ - private void checkBackupQueue(int backups) throws Exception { + private void checkBackupQueue(int backups, boolean updateFromClient) throws Exception { this.backups = backups; final int SRV_NODES = 4; @@ -183,6 +270,10 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo int PARTS = 10; + Map<Object, T2<Object, Object>> updates = new HashMap<>(); + + List<T3<Object, Object, Object>> expEvts = new ArrayList<>(); + for (int i = 0; i < SRV_NODES - 1; i++) { log.info("Stop iteration: " + i); @@ -203,7 +294,23 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo for (Integer key : keys) { log.info("Put [node=" + ignite.name() + ", key=" + key + ", part=" + aff.partition(key) + ']'); - cache.put(key, key); + T2<Object, Object> t = updates.get(key); + + if (t == null) { + updates.put(key, new T2<>((Object)key, null)); + + expEvts.add(new T3<>((Object)key, (Object)key, null)); + } + else { + updates.put(key, new T2<>((Object)key, (Object)key)); + + expEvts.add(new T3<>((Object)key, (Object)key, (Object)key)); + } + + if (updateFromClient) + qryClientCache.put(key, key); + else + cache.put(key, key); if (first) { spi.skipMsg = true; @@ -223,6 +330,8 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo fail("Failed to wait for notifications [exp=" + keys.size() + ", left=" + lsnr.latch.getCount() + ']'); } + + checkEvents(expEvts, lsnr); } for (int i = 0; i < SRV_NODES - 1; i++) { @@ -241,7 +350,23 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo for (Integer key : keys) { log.info("Put [node=" + ignite.name() + ", key=" + key + ", part=" + aff.partition(key) + ']'); - cache.put(key, key); + T2<Object, Object> t = updates.get(key); + + if (t == null) { + updates.put(key, new T2<>((Object)key, null)); + + expEvts.add(new T3<>((Object)key, (Object)key, null)); + } + else { + updates.put(key, new T2<>((Object)key, (Object)key)); + + expEvts.add(new T3<>((Object)key, (Object)key, (Object)key)); + } + + if (updateFromClient) + qryClientCache.put(key, key); + else + cache.put(key, key); } if (!latch.await(5, SECONDS)) { @@ -253,6 +378,8 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo fail("Failed to wait for notifications [exp=" + keys.size() + ", left=" + lsnr.latch.getCount() + ']'); } + + checkEvents(expEvts, lsnr); } cur.close(); @@ -261,6 +388,24 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo } /** + * @param expEvts Expected events. + * @param lsnr Listener. + */ + private void checkEvents(List<T3<Object, Object, Object>> expEvts, CacheEventListener1 lsnr) { + for (T3<Object, Object, Object> exp : expEvts) { + CacheEntryEvent<?, ?> e = lsnr.evts.get(exp.get1()); + + assertNotNull("No event for key: " + exp.get1(), e); + assertEquals("Unexpected value: " + e, exp.get2(), e.getValue()); + assertEquals("Unexpected old value: " + e, exp.get3(), e.getOldValue()); + } + + expEvts.clear(); + + lsnr.evts.clear(); + } + + /** * @param cache Cache. * @param parts Number of partitions. * @return Keys. @@ -449,7 +594,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo /** * @throws Exception If failed. */ - public void _testFailover() throws Exception { + public void testFailover() throws Exception { final int SRV_NODES = 4; startGridsMultiThreaded(SRV_NODES); @@ -483,14 +628,12 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo startGrid(idx); - Thread.sleep(2000); + Thread.sleep(3000); log.info("Stop node: " + idx); stopGrid(idx); - Thread.sleep(1000); - CountDownLatch latch = new CountDownLatch(1); assertTrue(checkLatch.compareAndSet(null, latch)); @@ -508,6 +651,8 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo final Map<Integer, Integer> vals = new HashMap<>(); + final Map<Integer, List<T2<Integer, Integer>>> expEvts = new HashMap<>(); + try { long stopTime = System.currentTimeMillis() + 3 * 60_000; @@ -518,6 +663,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo while (System.currentTimeMillis() < stopTime) { Integer key = rnd.nextInt(PARTS); + Integer prevVal = vals.get(key); Integer val = vals.get(key); if (val == null) @@ -529,6 +675,16 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo vals.put(key, val); + List<T2<Integer, Integer>> keyEvts = expEvts.get(key); + + if (keyEvts == null) { + keyEvts = new ArrayList<>(); + + expEvts.put(key, keyEvts); + } + + keyEvts.add(new T2<>(val, prevVal)); + CountDownLatch latch = checkLatch.get(); if (latch != null) { @@ -544,12 +700,12 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo boolean check = GridTestUtils.waitForCondition(new GridAbsPredicate() { @Override public boolean apply() { - return checkEvents(false, vals, lsnr); + return checkEvents(false, expEvts, lsnr); } }, 10_000); if (!check) - assertTrue(checkEvents(true, vals, lsnr)); + assertTrue(checkEvents(true, expEvts, lsnr)); success = true; @@ -577,12 +733,12 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo boolean check = GridTestUtils.waitForCondition(new GridAbsPredicate() { @Override public boolean apply() { - return checkEvents(false, vals, lsnr); + return checkEvents(false, expEvts, lsnr); } }, 10_000); if (!check) - assertTrue(checkEvents(true, vals, lsnr)); + assertTrue(checkEvents(true, expEvts, lsnr)); cur.close(); @@ -591,47 +747,64 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo /** * @param logAll If {@code true} logs all unexpected values. - * @param vals Expected values. + * @param expEvts Expected values. * @param lsnr Listener. * @return Check status. */ - private boolean checkEvents(boolean logAll, Map<Integer, Integer> vals, CacheEventListener2 lsnr) { - assertTrue(!vals.isEmpty()); - - ConcurrentHashMap<Integer, Integer> lsnrVals = lsnr.vals; - - ConcurrentHashMap<Integer, Integer> lsnrCntrs = lsnr.cntrs; + @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") + private boolean checkEvents(boolean logAll, + Map<Integer, List<T2<Integer, Integer>>> expEvts, + CacheEventListener2 lsnr) { + assertTrue(!expEvts.isEmpty()); boolean pass = true; - for (Map.Entry<Integer, Integer> e : vals.entrySet()) { + for (Map.Entry<Integer, List<T2<Integer, Integer>>> e : expEvts.entrySet()) { Integer key = e.getKey(); + List<T2<Integer, Integer>> exp = e.getValue(); - Integer lsnrVal = lsnrVals.get(key); - Integer expVal = e.getValue(); + List<CacheEntryEvent<?, ?>> rcvdEvts = lsnr.evts.get(key); - if (!expVal.equals(lsnrVal)) { + if (rcvdEvts == null) { pass = false; - log.info("Unexpected value [key=" + key + ", val=" + lsnrVal + ", expVal=" + expVal + ']'); + log.info("No events for key [key=" + key + ", exp=" + e.getValue() + ']'); if (!logAll) return false; } + else { + synchronized (rcvdEvts) { + if (rcvdEvts.size() != exp.size()) { + pass = false; - Integer lsnrCntr = lsnrCntrs.get(key); - Integer expCntr = expVal + 1; + log.info("Missed or extra events for key [key=" + key + + ", exp=" + e.getValue() + + ", rcvd=" + rcvdEvts + ']'); - if (!expCntr.equals(lsnrCntr)) { - pass = false; + if (!logAll) + return false; + } - log.info("Unexpected events count [key=" + key + ", val=" + lsnrCntr + ", expVal=" + expCntr + ']'); + int cnt = Math.min(rcvdEvts.size(), exp.size()); - if (!logAll) - return false; + for (int i = 0; i < cnt; i++) { + T2<Integer, Integer> expEvt = exp.get(i); + CacheEntryEvent<?, ?> rcvdEvt = rcvdEvts.get(i); + + assertEquals(key, rcvdEvt.getKey()); + assertEquals(expEvt.get1(), rcvdEvt.getValue()); + assertEquals(expEvt.get2(), rcvdEvt.getOldValue()); + } + } } } + if (pass) { + expEvts.clear(); + lsnr.evts.clear(); + } + return pass; } @@ -646,6 +819,9 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo private GridConcurrentHashSet<Integer> keys = new GridConcurrentHashSet<>(); /** */ + private ConcurrentHashMap<Object, CacheEntryEvent<?, ?>> evts = new ConcurrentHashMap<>(); + + /** */ @LoggerResource private IgniteLogger log; @@ -657,6 +833,8 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo log.info("Received cache event: " + evt + " " + (latch != null ? latch.getCount() : null)); + this.evts.put(evt.getKey(), evt); + keys.add((Integer) evt.getKey()); assertTrue(latch != null); @@ -691,7 +869,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo private final ConcurrentHashMap<Integer, Integer> vals = new ConcurrentHashMap<>(); /** */ - private final ConcurrentHashMap<Integer, Integer> cntrs = new ConcurrentHashMap<>(); + private final ConcurrentHashMap<Integer, List<CacheEntryEvent<?, ?>>> evts = new ConcurrentHashMap<>(); /** {@inheritDoc} */ @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) @@ -706,25 +884,34 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo Integer prevVal = vals.get(key); + boolean dup = false; + if (prevVal != null) { - assertEquals("Unexpected event: " + evt, (Integer)(prevVal + 1), val); - assertEquals("Unexpected event: " + evt, prevVal, evt.getOldValue()); + if (prevVal.equals(val)) // Can get this event with automatic put retry. + dup = true; + else { + assertEquals("Unexpected event: " + evt, (Integer)(prevVal + 1), val); + assertEquals("Unexpected event: " + evt, prevVal, evt.getOldValue()); + } } else { assertEquals("Unexpected event: " + evt, (Object)0, val); assertNull("Unexpected event: " + evt, evt.getOldValue()); } - vals.put(key, val); + if (!dup) { + vals.put(key, val); - Integer cntr = cntrs.get(key); + List<CacheEntryEvent<?, ?>> keyEvts = this.evts.get(key); - if (cntr == null) - cntr = 1; - else - cntr = cntr + 1; + if (keyEvts == null) { + keyEvts = Collections.synchronizedList(new ArrayList<CacheEntryEvent<?, ?>>()); - cntrs.put(key, cntr); + this.evts.put(key, keyEvts); + } + + keyEvts.add(evt); + } } } catch (Throwable e) {