http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b4045ce1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 84c9287..a12de1b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -23,6 +23,7 @@ import org.apache.ignite.internal.*; import org.apache.ignite.internal.cluster.*; import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.conflict.*; import org.apache.ignite.internal.processors.cache.distributed.dht.*; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*; import org.apache.ignite.internal.processors.cache.distributed.near.*; @@ -62,6 +63,7 @@ import static org.apache.ignite.internal.processors.dr.GridDrType.*; /** * Non-transactional partitioned cache. */ +@SuppressWarnings("unchecked") @GridToStringExclude public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /** */ @@ -125,6 +127,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { }); updateReplyClos = new CI2<GridNearAtomicUpdateRequest<K, V>, GridNearAtomicUpdateResponse<K, V>>() { + @SuppressWarnings("ThrowableResultOfMethodCallIgnored") @Override public void apply(GridNearAtomicUpdateRequest<K, V> req, GridNearAtomicUpdateResponse<K, V> res) { if (ctx.config().getAtomicWriteOrderMode() == CLOCK) { // Always send reply in CLOCK ordering mode. @@ -237,6 +240,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } /** {@inheritDoc} */ + @SuppressWarnings("ConstantConditions") @Override public V peek(K key, @Nullable Collection<GridCachePeekMode> modes) throws IgniteCheckedException { GridTuple<V> val = null; @@ -453,18 +457,20 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } /** {@inheritDoc} */ - @Override public void putAllDr(Map<? extends K, GridCacheDrInfo<V>> drMap) throws IgniteCheckedException { - putAllDrAsync(drMap).get(); + @Override public void putAllConflict(Map<? extends K, GridCacheDrInfo<V>> conflictMap) + throws IgniteCheckedException { + putAllConflictAsync(conflictMap).get(); } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<?> putAllDrAsync(Map<? extends K, GridCacheDrInfo<V>> drMap) { - ctx.dr().onReceiveCacheEntriesReceived(drMap.size()); + @Override public IgniteInternalFuture<?> putAllConflictAsync(Map<? extends K, GridCacheDrInfo<V>> conflictMap) { + // TODO IGNITE-283: Invalid metrics update in case of local store. + ctx.dr().onReceiveCacheEntriesReceived(conflictMap.size()); return updateAllAsync0(null, null, null, - drMap, + conflictMap, null, false, false, @@ -534,15 +540,17 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } /** {@inheritDoc} */ - @Override public void removeAllDr(Map<? extends K, GridCacheVersion> drMap) throws IgniteCheckedException { - removeAllDrAsync(drMap).get(); + @Override public void removeAllConflict(Map<? extends K, GridCacheVersion> conflictMap) + throws IgniteCheckedException { + removeAllConflictAsync(conflictMap).get(); } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<?> removeAllDrAsync(Map<? extends K, GridCacheVersion> drMap) { - ctx.dr().onReceiveCacheEntriesReceived(drMap.size()); + @Override public IgniteInternalFuture<?> removeAllConflictAsync(Map<? extends K, GridCacheVersion> conflictMap) { + // TODO: IGNITE-283: Invalid metrics update in case of local store. + ctx.dr().onReceiveCacheEntriesReceived(conflictMap.size()); - return removeAllAsync0(null, drMap, null, false, false, null); + return removeAllAsync0(null, conflictMap, null, false, false, null); } /** @@ -727,23 +735,24 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /** * Entry point for all public API put/transform methods. * - * @param map Put map. Either {@code map}, {@code invokeMap} or {@code drMap} should be passed. - * @param invokeMap Invoke map. Either {@code map}, {@code invokeMap} or {@code drMap} should be passed. + * @param map Put map. Either {@code map}, {@code invokeMap} or {@code conflictPutMap} should be passed. + * @param invokeMap Invoke map. Either {@code map}, {@code invokeMap} or {@code conflictPutMap} should be passed. * @param invokeArgs Optional arguments for EntryProcessor. - * @param drPutMap DR put map. - * @param drRmvMap DR remove map. + * @param conflictPutMap Conflict put map. + * @param conflictRmvMap Conflict remove map. * @param retval Return value required flag. * @param rawRetval Return {@code GridCacheReturn} instance. * @param cached Cached cache entry for key. May be passed if and only if map size is {@code 1}. * @param filter Cache entry filter for atomic updates. * @return Completion future. */ + @SuppressWarnings("ConstantConditions") private IgniteInternalFuture updateAllAsync0( @Nullable final Map<? extends K, ? extends V> map, @Nullable final Map<? extends K, ? extends EntryProcessor> invokeMap, @Nullable Object[] invokeArgs, - @Nullable final Map<? extends K, GridCacheDrInfo<V>> drPutMap, - @Nullable final Map<? extends K, GridCacheVersion> drRmvMap, + @Nullable final Map<? extends K, GridCacheDrInfo<V>> conflictPutMap, + @Nullable final Map<? extends K, GridCacheVersion> conflictRmvMap, final boolean retval, final boolean rawRetval, @Nullable GridCacheEntryEx<K, V> cached, @@ -765,12 +774,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { this, ctx.config().getWriteSynchronizationMode(), invokeMap != null ? TRANSFORM : UPDATE, - map != null ? map.keySet() : invokeMap != null ? invokeMap.keySet() : drPutMap != null ? - drPutMap.keySet() : drRmvMap.keySet(), + map != null ? map.keySet() : invokeMap != null ? invokeMap.keySet() : conflictPutMap != null ? + conflictPutMap.keySet() : conflictRmvMap.keySet(), map != null ? map.values() : invokeMap != null ? invokeMap.values() : null, invokeArgs, - drPutMap != null ? drPutMap.values() : null, - drRmvMap != null ? drRmvMap.values() : null, + conflictPutMap != null ? conflictPutMap.values() : null, + conflictRmvMap != null ? conflictRmvMap.values() : null, retval, rawRetval, cached, @@ -792,7 +801,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * Entry point for all public API remove methods. * * @param keys Keys to remove. - * @param drMap DR map. + * @param conflictMap Conflict map. * @param cached Cached cache entry for key. May be passed if and only if keys size is {@code 1}. * @param retval Return value required flag. * @param rawRetval Return {@code GridCacheReturn} instance. @@ -801,7 +810,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { */ private IgniteInternalFuture removeAllAsync0( @Nullable final Collection<? extends K> keys, - @Nullable final Map<? extends K, GridCacheVersion> drMap, + @Nullable final Map<? extends K, GridCacheVersion> conflictMap, @Nullable GridCacheEntryEx<K, V> cached, final boolean retval, boolean rawRetval, @@ -811,7 +820,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { final long start = statsEnabled ? System.nanoTime() : 0L; - assert keys != null || drMap != null; + assert keys != null || conflictMap != null; if (keyCheck) validateCacheKeys(keys); @@ -829,11 +838,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { this, ctx.config().getWriteSynchronizationMode(), DELETE, - keys != null ? keys : drMap.keySet(), + keys != null ? keys : conflictMap.keySet(), null, null, null, - keys != null ? null : drMap.values(), + keys != null ? null : conflictMap.values(), retval, rawRetval, cached, @@ -1004,13 +1013,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * * @param nodeId Node ID. * @param req Update request. - * @param cached Cached entry if updating single local entry. * @param completionCb Completion callback. */ public void updateAllAsyncInternal( final UUID nodeId, final GridNearAtomicUpdateRequest<K, V> req, - @Nullable final GridCacheEntryEx<K, V> cached, final CI2<GridNearAtomicUpdateRequest<K, V>, GridNearAtomicUpdateResponse<K, V>> completionCb ) { IgniteInternalFuture<Object> forceFut = preldr.request(req.keys(), req.topologyVersion()); @@ -1097,16 +1104,15 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { dhtFut = createDhtFuture(ver, req, res, completionCb, false); - boolean replicate = ctx.isDrEnabled(); - expiry = expiryPolicy(req.expiry()); GridCacheReturn<Object> retVal = null; + // TODO: IGNITE-283: Merge local store and DR checks into single method inside ctx. if (keys.size() > 1 && // Several keys ... writeThrough() && // and store is enabled ... !ctx.store().isLocalStore() && // and this is not local store ... - !ctx.dr().receiveEnabled() // and no DR. + !ctx.dr().receiveEnabled() // and no DR. ) { // This method can only be used when there are no replicated entries in the batch. UpdateBatchResult<K, V> updRes = updateWithBatch(node, @@ -1117,7 +1123,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { ver, dhtFut, completionCb, - replicate, + ctx.isDrEnabled(), taskName, expiry); @@ -1136,7 +1142,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { ver, dhtFut, completionCb, - replicate, + ctx.isDrEnabled(), taskName, expiry); @@ -1642,14 +1648,26 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (entry == null) continue; - GridCacheVersion newDrVer = req.drVersion(i); - long newDrTtl = req.drTtl(i); - long newDrExpireTime = req.drExpireTime(i); + GridCacheConflictInfo newConflictInfo = req.conflictInfo(i); + + GridCacheVersion newConflictVer; + long newConflictTtl; + long newConflictExpireTime; - assert !(newDrVer instanceof GridCacheVersionEx) : newDrVer; // Plain version is expected here. + if (newConflictInfo != null) { + newConflictVer = newConflictInfo.version(); + newConflictTtl = newConflictInfo.ttl(); + newConflictExpireTime = newConflictInfo.expireTime(); + } + else { + newConflictVer = null; + newConflictTtl = CU.TTL_NOT_CHANGED; + newConflictExpireTime = CU.EXPIRE_TIME_CALCULATE; + } - if (newDrVer == null) - newDrVer = ver; + // Plain version is expected here. + assert newConflictInfo == null || + (newConflictInfo.version() != null && !(newConflictInfo.version() instanceof GridCacheVersionEx)); boolean primary = !req.fastMap() || ctx.affinity().primary(ctx.localNode(), entry.key(), req.topologyVersion()); @@ -1683,10 +1701,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { ctx.config().getAtomicWriteOrderMode() == CLOCK, // Check version in CLOCK mode on primary node. req.filter(), replicate ? primary ? DR_PRIMARY : DR_BACKUP : DR_NONE, - newDrTtl, - newDrExpireTime, - newDrVer, - true, + new GridCacheConflictInnerUpdate(true, newConflictVer, newConflictTtl, newConflictExpireTime), intercept, req.subjectId(), taskName); @@ -1699,17 +1714,16 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (dhtFut != null) { if (updRes.sendToDht()) { // Send to backups even in case of remove-remove scenarios. - GridCacheVersionConflictContext<K, V> ctx = updRes.drResolveResult(); + GridCacheVersionConflictContext<K, V> conflictCtx = updRes.conflictResolveResult(); - long ttl = updRes.newTtl(); - long expireTime = updRes.drExpireTime(); + if (conflictCtx != null) { + assert newConflictInfo != null; - if (ctx == null) - newDrVer = null; - else if (ctx.isMerge()) { - newDrVer = null; // DR version is discarded in case of merge. - newValBytes = null; // Value has been changed. + if (conflictCtx.isMerge()) + newConflictVer = null; // Conflict version is discarded in case of merge. } + else + assert newConflictInfo == null; EntryProcessor<K, V, ?> entryProcessor = null; @@ -1719,21 +1733,21 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (!readersOnly) { dhtFut.addWriteEntry(entry, updRes.newValue(), - newValBytes, + updRes.newValueBytes(), entryProcessor, updRes.newTtl(), - expireTime, - newDrVer); + updRes.conflictExpireTime(), + newConflictVer); } if (!F.isEmpty(filteredReaders)) dhtFut.addNearWriteEntries(filteredReaders, entry, updRes.newValue(), - newValBytes, + updRes.newValueBytes(), entryProcessor, - ttl, - expireTime); + updRes.newTtl(), + updRes.conflictExpireTime()); } else { if (log.isDebugEnabled()) @@ -1745,26 +1759,18 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (hasNear) { if (primary && updRes.sendToDht()) { if (!ctx.affinity().belongs(node, entry.partition(), topVer)) { - GridCacheVersionConflictContext<K, V> ctx = updRes.drResolveResult(); - - long ttl = updRes.newTtl(); - long expireTime = updRes.drExpireTime(); - - if (ctx != null && ctx.isMerge()) - newValBytes = null; - // If put the same value as in request then do not need to send it back. if (op == TRANSFORM || writeVal != updRes.newValue()) { res.addNearValue(i, updRes.newValue(), - newValBytes, - ttl, - expireTime); + updRes.newValueBytes(), + updRes.newTtl(), + updRes.conflictExpireTime()); } else - res.addNearTtl(i, ttl, expireTime); + res.addNearTtl(i, updRes.newTtl(), updRes.conflictExpireTime()); - if (updRes.newValue() != null || newValBytes != null) { + if (updRes.newValue() != null || updRes.newValueBytes() != null) { IgniteInternalFuture<Boolean> f = entry.addReader(node.id(), req.messageId(), topVer); assert f == null : f; @@ -1856,7 +1862,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { ) { assert putMap == null ^ rmvKeys == null; - assert req.drVersions() == null : "updatePartialBatch cannot be called when there are DR entries in the batch."; + assert req.conflictInfos() == null : "Cannot be called when there are conflict entries in the batch."; long topVer = req.topologyVersion(); @@ -1961,15 +1967,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { ctx.config().getAtomicWriteOrderMode() == CLOCK, // Check version in CLOCK mode on primary node. null, replicate ? primary ? DR_PRIMARY : DR_BACKUP : DR_NONE, - -1L, - -1L, - null, - false, + new GridCacheConflictInnerUpdate(false, null, CU.TTL_NOT_CHANGED, CU.EXPIRE_TIME_CALCULATE), false, req.subjectId(), taskName); - assert updRes.newTtl() == -1L || expiry != null; + assert updRes.newTtl() == CU.TTL_NOT_CHANGED || expiry != null; if (intercept) { if (op == UPDATE) @@ -2032,10 +2035,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { writeVal, valBytes, updRes.newTtl(), - -1); + CU.EXPIRE_TIME_CALCULATE); } else - res.addNearTtl(idx, updRes.newTtl(), -1); + res.addNearTtl(idx, updRes.newTtl(), CU.EXPIRE_TIME_CALCULATE); if (writeVal != null || !entry.valueBytes().isNull()) { IgniteInternalFuture<Boolean> f = entry.addReader(node.id(), req.messageId(), topVer); @@ -2262,7 +2265,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { Collection<GridCacheDrInfo<V>> drPutVals; Collection<GridCacheVersion> drRmvVals; - if (req.drVersions() == null) { + if (req.conflictInfos() == null) { + // This is regular PUT, i.e. no conflicts. vals = req.values(); drPutVals = null; @@ -2274,13 +2278,15 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { drPutVals = new ArrayList<>(size); for (int i = 0; i < size; i++) { - long ttl = req.drTtl(i); + GridCacheConflictInfo conflictInfo = req.conflictInfo(i); + + assert conflictInfo != null; - if (ttl == -1L) - drPutVals.add(new GridCacheDrInfo<>(req.value(i), req.drVersion(i))); + if (conflictInfo.hasExpirationInfo()) + drPutVals.add(new GridCacheDrExpirationInfo<>(req.value(i), conflictInfo.version(), + conflictInfo.ttl(), conflictInfo.expireTime())); else - drPutVals.add(new GridCacheDrExpirationInfo<>(req.value(i), req.drVersion(i), ttl, - req.drExpireTime(i))); + drPutVals.add(new GridCacheDrInfo<>(req.value(i), conflictInfo.version())); } vals = null; @@ -2289,7 +2295,16 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { else { assert req.operation() == DELETE; - drRmvVals = req.drVersions(); + List<GridCacheConflictInfo> conflictInfos = req.conflictInfos(); + + assert conflictInfos != null; + + drRmvVals = F.viewReadOnly(conflictInfos, new IgniteClosure<GridCacheConflictInfo, GridCacheVersion>() { + @Override public GridCacheVersion apply(GridCacheConflictInfo conflictInfo) { + return conflictInfo.version(); + } + } + ); vals = null; drPutVals = null; @@ -2392,7 +2407,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { req.nodeId(ctx.localNodeId()); - updateAllAsyncInternal(nodeId, req, null, updateReplyClos); + updateAllAsyncInternal(nodeId, req, updateReplyClos); } /** @@ -2454,10 +2469,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { DELETE; long ttl = req.ttl(i); - long expireTime = req.drExpireTime(i); - - if (ttl != -1L && expireTime == -1L) - expireTime = CU.toExpireTime(ttl); + long expireTime = req.conflictExpireTime(i); GridCacheUpdateAtomicResult<K, V> updRes = entry.innerUpdate( ver, @@ -2476,10 +2488,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /*check version*/!req.forceTransformBackups(), CU.<K, V>empty(), replicate ? DR_BACKUP : DR_NONE, - ttl, - expireTime, - req.drVersion(i), - false, + new GridCacheConflictInnerUpdate(false, req.conflictVersion(i), ttl, expireTime), intercept, req.subjectId(), taskName);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b4045ce1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java index a4bda8e..92fe74b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java @@ -210,16 +210,16 @@ public class GridDhtAtomicUpdateFuture<K, V> extends GridFutureAdapter<Void> * @param valBytes Value bytes. * @param entryProcessor Entry processor. * @param ttl TTL (optional). - * @param drExpireTime DR expire time (optional). - * @param drVer DR version (optional). + * @param conflictExpireTime Conflict expire time (optional). + * @param conflictVer Conflict version (optional). */ public void addWriteEntry(GridDhtCacheEntry<K, V> entry, @Nullable V val, @Nullable byte[] valBytes, EntryProcessor<K, V, ?> entryProcessor, long ttl, - long drExpireTime, - @Nullable GridCacheVersion drVer) { + long conflictExpireTime, + @Nullable GridCacheVersion conflictVer) { long topVer = updateReq.topologyVersion(); Collection<ClusterNode> dhtNodes = cctx.dht().topology().nodes(entry.partition(), topVer); @@ -259,8 +259,8 @@ public class GridDhtAtomicUpdateFuture<K, V> extends GridFutureAdapter<Void> valBytes, entryProcessor, ttl, - drExpireTime, - drVer); + conflictExpireTime, + conflictVer); } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b4045ce1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java index 3f3e359..97163fc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java @@ -552,7 +552,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp * @param idx Index. * @return DR version. */ - @Nullable public GridCacheVersion drVersion(int idx) { + @Nullable public GridCacheVersion conflictVersion(int idx) { if (drVers != null) { assert idx >= 0 && idx < drVers.size(); @@ -601,7 +601,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp * @param idx Index. * @return DR TTL. */ - public long drExpireTime(int idx) { + public long conflictExpireTime(int idx) { if (drExpireTimes != null) { assert idx >= 0 && idx < drExpireTimes.size(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b4045ce1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index 5c65404..83344cf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -25,6 +25,7 @@ import org.apache.ignite.internal.*; import org.apache.ignite.internal.cluster.*; import org.apache.ignite.internal.managers.discovery.*; import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.conflict.*; import org.apache.ignite.internal.processors.cache.distributed.dht.*; import org.apache.ignite.internal.processors.cache.distributed.near.*; import org.apache.ignite.internal.processors.cache.dr.*; @@ -87,11 +88,11 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> /** DR put values. */ @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"}) - private Collection<GridCacheDrInfo<V>> drPutVals; + private Collection<GridCacheDrInfo<V>> conflictPutVals; /** DR remove values. */ @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"}) - private Collection<GridCacheVersion> drRmvVals; + private Collection<GridCacheVersion> conflictRmvVals; /** Mappings. */ @GridToStringInclude @@ -174,8 +175,8 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> * @param keys Keys to update. * @param vals Values or transform closure. * @param invokeArgs Optional arguments for entry processor. - * @param drPutVals DR put values (optional). - * @param drRmvVals DR remove values (optional). + * @param conflictPutVals Conflict put values (optional). + * @param conflictRmvVals Conflict remove values (optional). * @param retval Return value require flag. * @param rawRetval {@code True} if should return {@code GridCacheReturn} as future result. * @param cached Cached entry if keys size is 1. @@ -192,8 +193,8 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> Collection<? extends K> keys, @Nullable Collection<?> vals, @Nullable Object[] invokeArgs, - @Nullable Collection<GridCacheDrInfo<V>> drPutVals, - @Nullable Collection<GridCacheVersion> drRmvVals, + @Nullable Collection<GridCacheDrInfo<V>> conflictPutVals, + @Nullable Collection<GridCacheVersion> conflictRmvVals, final boolean retval, final boolean rawRetval, @Nullable GridCacheEntryEx<K, V> cached, @@ -207,8 +208,8 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> this.rawRetval = rawRetval; assert vals == null || vals.size() == keys.size(); - assert drPutVals == null || drPutVals.size() == keys.size(); - assert drRmvVals == null || drRmvVals.size() == keys.size(); + assert conflictPutVals == null || conflictPutVals.size() == keys.size(); + assert conflictRmvVals == null || conflictRmvVals.size() == keys.size(); assert cached == null || keys.size() == 1; assert subjId != null; @@ -219,8 +220,8 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> this.keys = keys; this.vals = vals; this.invokeArgs = invokeArgs; - this.drPutVals = drPutVals; - this.drRmvVals = drRmvVals; + this.conflictPutVals = conflictPutVals; + this.conflictRmvVals = conflictRmvVals; this.retval = retval; this.cached = cached; this.expiryPlc = expiryPlc; @@ -520,35 +521,31 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> K key = F.first(keys); Object val; - long drTtl; - long drExpireTime; - GridCacheVersion drVer; + GridCacheConflictInfo conflictInfo; if (vals != null) { + // Regular PUT. val = F.first(vals); - drTtl = -1; - drExpireTime = -1; - drVer = null; + conflictInfo = null; } - else if (drPutVals != null) { - GridCacheDrInfo<V> drPutVal = F.first(drPutVals); + else if (conflictPutVals != null) { + // Conflict PUT. + GridCacheDrInfo<V> conflictPutVal = F.first(conflictPutVals); - val = drPutVal.value(); - drTtl = drPutVal.ttl(); - drExpireTime = drPutVal.expireTime(); - drVer = drPutVal.version(); + val = conflictPutVal.value(); + conflictInfo = GridCacheConflictInfo.create(conflictPutVal.version(), conflictPutVal.ttl(), + conflictPutVal.expireTime()); } - else if (drRmvVals != null) { + else if (conflictRmvVals != null) { + // Conflict REMOVE. val = null; - drTtl = -1; - drExpireTime = -1; - drVer = F.first(drRmvVals); + conflictInfo = GridCacheConflictInfo.create(F.first(conflictRmvVals), CU.TTL_NOT_CHANGED, + CU.EXPIRE_TIME_CALCULATE); } else { + // Regular REMOVE. val = null; - drTtl = -1; - drExpireTime = -1; - drVer = null; + conflictInfo = null; } // We still can get here if user pass map with single element. @@ -599,7 +596,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> subjId, taskNameHash); - req.addUpdateEntry(key, val, drTtl, drExpireTime, drVer, true); + req.addUpdateEntry(key, val, conflictInfo, true); single = true; @@ -614,15 +611,15 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> if (vals != null) it = vals.iterator(); - Iterator<GridCacheDrInfo<V>> drPutValsIt = null; + Iterator<GridCacheDrInfo<V>> conflictPutValsIt = null; - if (drPutVals != null) - drPutValsIt = drPutVals.iterator(); + if (conflictPutVals != null) + conflictPutValsIt = conflictPutVals.iterator(); - Iterator<GridCacheVersion> drRmvValsIt = null; + Iterator<GridCacheVersion> conflictRmvValsIt = null; - if (drRmvVals != null) - drRmvValsIt = drRmvVals.iterator(); + if (conflictRmvVals != null) + conflictRmvValsIt = conflictRmvVals.iterator(); Map<UUID, GridNearAtomicUpdateRequest<K, V>> pendingMappings = new HashMap<>(topNodes.size(), 1.0f); @@ -643,15 +640,11 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> } Object val; - long drTtl; - long drExpireTime; - GridCacheVersion drVer; + GridCacheConflictInfo conflictInfo; if (vals != null) { val = it.next(); - drTtl = -1; - drExpireTime = -1; - drVer = null; + conflictInfo = null; if (val == null) { NullPointerException err = new NullPointerException("Null value."); @@ -661,25 +654,21 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> throw err; } } - else if (drPutVals != null) { - GridCacheDrInfo<V> drPutVal = drPutValsIt.next(); + else if (conflictPutVals != null) { + GridCacheDrInfo<V> conflictPutVal = conflictPutValsIt.next(); - val = drPutVal.value(); - drTtl = drPutVal.ttl(); - drExpireTime = drPutVal.expireTime(); - drVer = drPutVal.version(); + val = conflictPutVal.value(); + conflictInfo = GridCacheConflictInfo.create(conflictPutVal.version(), conflictPutVal.ttl(), + conflictPutVal.expireTime()); } - else if (drRmvVals != null) { + else if (conflictRmvVals != null) { val = null; - drTtl = -1; - drExpireTime = -1; - drVer = drRmvValsIt.next(); + conflictInfo = GridCacheConflictInfo.create(conflictRmvValsIt.next(), CU.TTL_NOT_CHANGED, + CU.EXPIRE_TIME_CALCULATE); } else { val = null; - drTtl = -1; - drExpireTime = -1; - drVer = null; + conflictInfo = null; } if (val == null && op != GridCacheOperation.DELETE) @@ -727,7 +716,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> "Invalid mapping state [old=" + old + ", remap=" + remap + ']'; } - mapped.addUpdateEntry(key, val, drTtl, drExpireTime, drVer, i == 0); + mapped.addUpdateEntry(key, val, conflictInfo, i == 0); i++; } @@ -778,7 +767,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> singleReq = req; if (ctx.localNodeId().equals(nodeId)) { - cache.updateAllAsyncInternal(nodeId, req, cached, + cache.updateAllAsyncInternal(nodeId, req, new CI2<GridNearAtomicUpdateRequest<K, V>, GridNearAtomicUpdateResponse<K, V>>() { @Override public void apply(GridNearAtomicUpdateRequest<K, V> req, GridNearAtomicUpdateResponse<K, V> res) { @@ -845,7 +834,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> opRes = new GridCacheReturn<>(null, true); if (locUpdate != null) { - cache.updateAllAsyncInternal(cctx.localNodeId(), locUpdate, cached, + cache.updateAllAsyncInternal(cctx.localNodeId(), locUpdate, new CI2<GridNearAtomicUpdateRequest<K, V>, GridNearAtomicUpdateResponse<K, V>>() { @Override public void apply(GridNearAtomicUpdateRequest<K, V> req, GridNearAtomicUpdateResponse<K, V> res) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b4045ce1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java index c206264..0de8308 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java @@ -21,9 +21,9 @@ import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.conflict.*; import org.apache.ignite.internal.processors.cache.distributed.*; import org.apache.ignite.internal.processors.cache.version.*; -import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; @@ -96,14 +96,8 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im private byte[][] invokeArgsBytes; /** DR versions. */ - @GridDirectCollection(GridCacheVersion.class) - private List<GridCacheVersion> drVers; - - /** DR TTLs. */ - private GridLongList drTtls; - - /** DR TTLs. */ - private GridLongList drExpireTimes; + @GridDirectCollection(GridCacheConflictInfo.class) + private List<GridCacheConflictInfo> conflictInfos; /** Return value flag. */ private boolean retval; @@ -290,16 +284,12 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im /** * @param key Key to add. * @param val Optional update value. - * @param drTtl DR TTL (optional). - * @param drExpireTime DR expire time (optional). - * @param drVer DR version (optional). + * @param conflictInfo Conflict info (optional). * @param primary If given key is primary on this mapping. */ public void addUpdateEntry(K key, @Nullable Object val, - long drTtl, - long drExpireTime, - @Nullable GridCacheVersion drVer, + @Nullable GridCacheConflictInfo conflictInfo, boolean primary) { assert val != null || op == DELETE; assert op != TRANSFORM || val instanceof EntryProcessor; @@ -310,40 +300,18 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im hasPrimary |= primary; // In case there is no DR, do not create the list. - if (drVer != null) { - if (drVers == null) { - drVers = new ArrayList<>(); - - for (int i = 0; i < keys.size() - 1; i++) - drVers.add(null); - } - - drVers.add(drVer); - } - else if (drVers != null) - drVers.add(drVer); - - if (drTtl >= 0) { - if (drTtls == null) { - drTtls = new GridLongList(keys.size()); + if (conflictInfo != null) { + if (conflictInfos == null) { + conflictInfos = new ArrayList<>(); for (int i = 0; i < keys.size() - 1; i++) - drTtls.add(-1); + conflictInfos.add(null); } - drTtls.add(drTtl); - } - - if (drExpireTime >= 0) { - if (drExpireTimes == null) { - drExpireTimes = new GridLongList(keys.size()); - - for (int i = 0; i < keys.size() - 1; i++) - drExpireTimes.add(-1); - } - - drExpireTimes.add(drExpireTime); + conflictInfos.add(conflictInfo); } + else if (conflictInfos != null) + conflictInfos.add(null); } /** @@ -378,6 +346,7 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im * @param idx Key index. * @return Value. */ + @SuppressWarnings("unchecked") public V value(int idx) { assert op == UPDATE : op; @@ -388,6 +357,7 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im * @param idx Key index. * @return Entry processor. */ + @SuppressWarnings("unchecked") public EntryProcessor<K, V, ?> entryProcessor(int idx) { assert op == TRANSFORM : op; @@ -434,67 +404,25 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im /** * @return DR versions. */ - @Nullable public List<GridCacheVersion> drVersions() { - return drVers; + @Nullable public List<GridCacheConflictInfo> conflictInfos() { + return conflictInfos; } /** * @param idx Index. * @return DR version. */ - @Nullable public GridCacheVersion drVersion(int idx) { - if (drVers != null) { - assert idx >= 0 && idx < drVers.size(); + @Nullable public GridCacheConflictInfo conflictInfo(int idx) { + if (conflictInfos != null) { + assert idx >= 0 && idx < conflictInfos.size(); - return drVers.get(idx); + return conflictInfos.get(idx); } return null; } /** - * @return DR TTLs. - */ - @Nullable public GridLongList drTtls() { - return drTtls; - } - - /** - * @param idx Index. - * @return DR TTL. - */ - public long drTtl(int idx) { - if (drTtls != null) { - assert idx >= 0 && idx < drTtls.size(); - - return drTtls.get(idx); - } - - return -1L; - } - - /** - * @return DR TTLs. - */ - @Nullable public GridLongList drExpireTimes() { - return drExpireTimes; - } - - /** - * @param idx Index. - * @return DR TTL. - */ - public long drExpireTime(int idx) { - if (drExpireTimes != null) { - assert idx >= 0 && idx < drExpireTimes.size(); - - return drExpireTimes.get(idx); - } - - return -1L; - } - - /** * @return Flag indicating whether this request contains primary keys. */ public boolean hasPrimary() { @@ -557,20 +485,8 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im } switch (writer.state()) { - case 3: - if (!writer.writeMessage("drExpireTimes", drExpireTimes)) - return false; - - writer.incrementState(); - - case 4: - if (!writer.writeMessage("drTtls", drTtls)) - return false; - - writer.incrementState(); - case 5: - if (!writer.writeCollection("drVers", drVers, Type.MSG)) + if (!writer.writeCollection("drVers", conflictInfos, Type.MSG)) return false; writer.incrementState(); @@ -684,24 +600,8 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im return false; switch (readState) { - case 3: - drExpireTimes = reader.readMessage("drExpireTimes"); - - if (!reader.isLastRead()) - return false; - - readState++; - - case 4: - drTtls = reader.readMessage("drTtls"); - - if (!reader.isLastRead()) - return false; - - readState++; - case 5: - drVers = reader.readCollection("drVers", Type.MSG); + conflictInfos = reader.readCollection("drVers", Type.MSG); if (!reader.isLastRead()) return false; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b4045ce1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java index 58bce51..5001810 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.near; import org.apache.ignite.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.conflict.*; import org.apache.ignite.internal.processors.cache.distributed.dht.*; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.*; import org.apache.ignite.internal.processors.cache.dr.*; @@ -163,8 +164,8 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { long ttl = res.nearTtl(i); long expireTime = res.nearExpireTime(i); - if (ttl != -1L && expireTime == -1L) - expireTime = GridCacheMapEntry.toExpireTime(ttl); + if (ttl != CU.TTL_NOT_CHANGED && expireTime == CU.EXPIRE_TIME_CALCULATE) + expireTime = CU.toExpireTime(ttl); try { processNearAtomicUpdateResponse(ver, @@ -234,10 +235,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { /*check version*/true, CU.<K, V>empty(), DR_NONE, - ttl, - expireTime, - null, - false, + new GridCacheConflictInnerUpdate(false, null, ttl, expireTime), false, subjId, taskName); @@ -317,9 +315,6 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { long ttl = req.nearTtl(i); long expireTime = req.nearExpireTime(i); - if (ttl != CU.TTL_NOT_CHANGED && expireTime == CU.EXPIRE_TIME_CALCULATE) - expireTime = GridCacheMapEntry.toExpireTime(ttl); - GridCacheUpdateAtomicResult<K, V> updRes = entry.innerUpdate( ver, nodeId, @@ -337,10 +332,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { /*check version*/!req.forceTransformBackups(), CU.<K, V>empty(), DR_NONE, - ttl, - expireTime, - null, - false, + new GridCacheConflictInnerUpdate(false, null, ttl, expireTime), intercept, req.subjectId(), taskName); @@ -526,13 +518,13 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { } /** {@inheritDoc} */ - @Override public void putAllDr(Map<? extends K, GridCacheDrInfo<V>> drMap) throws IgniteCheckedException { - dht.putAllDr(drMap); + @Override public void putAllConflict(Map<? extends K, GridCacheDrInfo<V>> drMap) throws IgniteCheckedException { + dht.putAllConflict(drMap); } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<?> putAllDrAsync(Map<? extends K, GridCacheDrInfo<V>> drMap) throws IgniteCheckedException { - return dht.putAllDrAsync(drMap); + @Override public IgniteInternalFuture<?> putAllConflictAsync(Map<? extends K, GridCacheDrInfo<V>> drMap) throws IgniteCheckedException { + return dht.putAllConflictAsync(drMap); } /** {@inheritDoc} */ @@ -645,13 +637,13 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { } /** {@inheritDoc} */ - @Override public void removeAllDr(Map<? extends K, GridCacheVersion> drMap) throws IgniteCheckedException { - dht.removeAllDr(drMap); + @Override public void removeAllConflict(Map<? extends K, GridCacheVersion> drMap) throws IgniteCheckedException { + dht.removeAllConflict(drMap); } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<?> removeAllDrAsync(Map<? extends K, GridCacheVersion> drMap) throws IgniteCheckedException { - return dht.removeAllDrAsync(drMap); + @Override public IgniteInternalFuture<?> removeAllConflictAsync(Map<? extends K, GridCacheVersion> drMap) throws IgniteCheckedException { + return dht.removeAllConflictAsync(drMap); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b4045ce1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java index 0ab7f67..6c875d8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java @@ -74,14 +74,14 @@ public class GridCacheDrInfo<V> implements Externalizable { * @return TTL. */ public long ttl() { - return 0L; + return CU.TTL_ETERNAL; } /** * @return Expire time. */ public long expireTime() { - return 0L; + return CU.EXPIRE_TIME_ETERNAL; } /** {@inheritDoc} */ @@ -96,6 +96,7 @@ public class GridCacheDrInfo<V> implements Externalizable { } /** {@inheritDoc} */ + @SuppressWarnings("unchecked") @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { val = (V)in.readObject(); ver = CU.readVersion(in); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b4045ce1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index 3a63236..fc19db4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -763,7 +763,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> } } - boolean drNeedResolve = cacheCtx.conflictNeedResolve(cached.version(), explicitVer); + boolean drNeedResolve = cacheCtx.conflictNeedResolve(); if (drNeedResolve) { IgniteBiTuple<GridCacheOperation, GridCacheVersionConflictContext<K, V>> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b4045ce1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersion.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersion.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersion.java index 53e43e2..b706e92 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersion.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersion.java @@ -149,9 +149,9 @@ public class GridCacheVersion extends MessageAdapter implements Comparable<GridC } /** - * @return DR version. + * @return Conflict version. */ - @Nullable public GridCacheVersion drVersion() { + @Nullable public GridCacheVersion conflictVersion() { return this; // Use current version. } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b4045ce1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionConflictContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionConflictContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionConflictContext.java index b813803..862efc2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionConflictContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionConflictContext.java @@ -43,9 +43,6 @@ public class GridCacheVersionConflictContext<K, V> { /** TTL. */ private long ttl; - /** Explicit TTL flag. */ - private boolean explicitTtl; - /** Manual resolve flag. */ private boolean manualResolve; @@ -110,8 +107,7 @@ public class GridCacheVersionConflictContext<K, V> { public void useNew() { state = State.USE_NEW; - if (!explicitTtl) - ttl = newEntry.ttl(); + ttl = newEntry.ttl(); } /** @@ -121,15 +117,16 @@ public class GridCacheVersionConflictContext<K, V> { * Also in case of merge you have to specify new TTL explicitly. For unlimited TTL use {@code 0}. * * @param mergeVal Merge value or {@code null} to force remove. - * @param ttl Time to live in milliseconds. + * @param ttl Time to live in milliseconds (must be non-negative). */ public void merge(@Nullable V mergeVal, long ttl) { + if (ttl < 0) + throw new IllegalArgumentException("TTL must be non-negative: " + ttl); + state = State.MERGE; this.mergeVal = mergeVal; this.ttl = ttl; - - explicitTtl = true; } /** @@ -185,15 +182,7 @@ public class GridCacheVersionConflictContext<K, V> { * @return Expire time. */ public long expireTime() { - return explicitTtl ? CU.toExpireTime(ttl) : isUseNew() ? newEntry.expireTime() : - isUseOld() ? oldEntry.expireTime() : 0L; - } - - /** - * @return Explicit TTL flag. - */ - public boolean explicitTtl() { - return explicitTtl; + return isUseNew() ? newEntry.expireTime() : isUseOld() ? oldEntry.expireTime() : CU.toExpireTime(ttl); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b4045ce1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionEx.java index 8c95b9b..4f135ca 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionEx.java @@ -77,7 +77,7 @@ public class GridCacheVersionEx extends GridCacheVersion { } /** {@inheritDoc} */ - @Override public GridCacheVersion drVersion() { + @Override public GridCacheVersion conflictVersion() { return drVer; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b4045ce1/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridDrDataLoadCacheUpdater.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridDrDataLoadCacheUpdater.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridDrDataLoadCacheUpdater.java index 2cc97da..b8cfe77 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridDrDataLoadCacheUpdater.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridDrDataLoadCacheUpdater.java @@ -61,14 +61,18 @@ public class GridDrDataLoadCacheUpdater<K, V> implements IgniteDataLoader.Update K key = entry.key(); - GridCacheDrInfo<V> val = entry.value() != null ? entry.expireTime() != 0 ? + // Ensure that updater to not receive special-purpose values for TTL and expire time. + assert entry.ttl() != CU.TTL_NOT_CHANGED && entry.ttl() != CU.TTL_ZERO && entry.ttl() >= 0; + assert entry.expireTime() != CU.EXPIRE_TIME_CALCULATE && entry.expireTime() >= 0; + + GridCacheDrInfo<V> val = entry.value() != null ? entry.ttl() != CU.TTL_ETERNAL ? new GridCacheDrExpirationInfo<>(entry.value(), entry.version(), entry.ttl(), entry.expireTime()) : new GridCacheDrInfo<>(entry.value(), entry.version()) : null; if (val == null) - cache.removeAllDr(Collections.singletonMap(key, entry.version())); + cache.removeAllConflict(Collections.singletonMap(key, entry.version())); else - cache.putAllDr(Collections.singletonMap(key, val)); + cache.putAllConflict(Collections.singletonMap(key, val)); } if (log.isDebugEnabled()) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b4045ce1/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java index 103b0ad..5db0da4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java @@ -8813,7 +8813,7 @@ public abstract class IgniteUtils { UNSAFE.putBoolean(arr, off++, verEx); if (verEx) { - GridCacheVersion drVer = ver.drVersion(); + GridCacheVersion drVer = ver.conflictVersion(); assert drVer != null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b4045ce1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java index 17d0bfc..2321f09 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.*; import org.apache.ignite.cache.eviction.*; +import org.apache.ignite.internal.processors.cache.conflict.*; import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.processors.dr.*; @@ -478,10 +479,7 @@ public class GridCacheTestEntryEx<K, V> extends GridMetadataAwareAdapter impleme boolean checkVer, @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter, GridDrType drType, - long drTtl, - long drExpireTime, - @Nullable GridCacheVersion drVer, - boolean drResolve, + GridCacheConflictInnerUpdate conflict, boolean intercept, UUID subjId, String taskName) throws IgniteCheckedException, @@ -490,6 +488,7 @@ public class GridCacheTestEntryEx<K, V> extends GridMetadataAwareAdapter impleme rawPut((V)val, 0), (V)val, null, + null, 0L, 0L, null,