# ignite-283: Reworked innerUpdate.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b4045ce1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b4045ce1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b4045ce1 Branch: refs/heads/ignite-283 Commit: b4045ce14d28b6e72bdda057ac4953cf29280309 Parents: 362bf16 Author: vozerov-gridgain <voze...@gridgain.com> Authored: Wed Feb 18 11:48:28 2015 +0300 Committer: vozerov-gridgain <voze...@gridgain.com> Committed: Wed Feb 18 11:48:28 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheAdapter.java | 14 +- .../processors/cache/GridCacheContext.java | 4 +- .../processors/cache/GridCacheEntryEx.java | 12 +- .../processors/cache/GridCacheMapEntry.java | 357 +++++++++---------- .../processors/cache/GridCacheProjectionEx.java | 8 +- .../cache/GridCacheProjectionImpl.java | 16 +- .../processors/cache/GridCacheProxyImpl.java | 16 +- .../cache/GridCacheUpdateAtomicResult.java | 42 ++- .../processors/cache/GridCacheUtils.java | 14 +- .../cache/conflict/GridCacheConflictInfo.java | 53 +++ .../conflict/GridCacheConflictInnerUpdate.java | 88 +++++ .../conflict/GridCacheNoTtlConflictInfo.java | 60 ++++ .../conflict/GridCacheTtlConflictInfo.java | 78 ++++ .../GridDistributedTxRemoteAdapter.java | 3 +- .../dht/atomic/GridDhtAtomicCache.java | 195 +++++----- .../dht/atomic/GridDhtAtomicUpdateFuture.java | 12 +- .../dht/atomic/GridDhtAtomicUpdateRequest.java | 4 +- .../dht/atomic/GridNearAtomicUpdateFuture.java | 105 +++--- .../dht/atomic/GridNearAtomicUpdateRequest.java | 144 ++------ .../distributed/near/GridNearAtomicCache.java | 34 +- .../processors/cache/dr/GridCacheDrInfo.java | 5 +- .../transactions/IgniteTxLocalAdapter.java | 2 +- .../cache/version/GridCacheVersion.java | 4 +- .../GridCacheVersionConflictContext.java | 23 +- .../cache/version/GridCacheVersionEx.java | 2 +- .../dr/GridDrDataLoadCacheUpdater.java | 10 +- .../ignite/internal/util/IgniteUtils.java | 2 +- .../processors/cache/GridCacheTestEntryEx.java | 7 +- 28 files changed, 739 insertions(+), 575 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b4045ce1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index fa3cb17..1daec2d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -2472,7 +2472,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, } /** {@inheritDoc} */ - @Override public void putAllDr(final Map<? extends K, GridCacheDrInfo<V>> drMap) throws IgniteCheckedException { + @Override public void putAllConflict(final Map<? extends K, GridCacheDrInfo<V>> drMap) throws IgniteCheckedException { if (F.isEmpty(drMap)) return; @@ -2486,13 +2486,13 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, } @Override public String toString() { - return "putAllDr [drMap=" + drMap + ']'; + return "putAllConflict [drMap=" + drMap + ']'; } }); } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<?> putAllDrAsync(final Map<? extends K, GridCacheDrInfo<V>> drMap) + @Override public IgniteInternalFuture<?> putAllConflictAsync(final Map<? extends K, GridCacheDrInfo<V>> drMap) throws IgniteCheckedException { if (F.isEmpty(drMap)) return new GridFinishedFuture<Object>(ctx.kernalContext()); @@ -2507,7 +2507,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, } @Override public String toString() { - return "putAllDrAsync [drMap=" + drMap + ']'; + return "putAllConflictAsync [drMap=" + drMap + ']'; } }); } @@ -3379,7 +3379,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, } /** {@inheritDoc} */ - @Override public void removeAllDr(final Map<? extends K, GridCacheVersion> drMap) throws IgniteCheckedException { + @Override public void removeAllConflict(final Map<? extends K, GridCacheVersion> drMap) throws IgniteCheckedException { ctx.denyOnLocalRead(); if (F.isEmpty(drMap)) @@ -3393,13 +3393,13 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, } @Override public String toString() { - return "removeAllDr [drMap=" + drMap + ']'; + return "removeAllConflict [drMap=" + drMap + ']'; } }); } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<?> removeAllDrAsync(final Map<? extends K, GridCacheVersion> drMap) + @Override public IgniteInternalFuture<?> removeAllConflictAsync(final Map<? extends K, GridCacheVersion> drMap) throws IgniteCheckedException { ctx.denyOnLocalRead(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b4045ce1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index 02624d7..060a825 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -1582,11 +1582,9 @@ public class GridCacheContext<K, V> implements Externalizable { /** * Check whether conflict resolution is required. * - * @param oldVer Old version. - * @param newVer New version. * @return {@code True} in case DR is required. */ - public boolean conflictNeedResolve(GridCacheVersion oldVer, GridCacheVersion newVer) { + public boolean conflictNeedResolve() { return conflictRslvr != null; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b4045ce1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java index f044347..0502e2a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.*; import org.apache.ignite.cache.eviction.*; +import org.apache.ignite.internal.processors.cache.conflict.*; import org.apache.ignite.internal.processors.cache.distributed.*; import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.internal.processors.cache.version.*; @@ -423,9 +424,9 @@ public interface GridCacheEntryEx<K, V> { * greater than passed in. * @param filter Optional filter to check. * @param drType DR type. - * @param drTtl DR TTL (if any). - * @param drExpireTime DR expire time (if any). - * @param drVer DR version (if any). + * @param conflictTtl Conflict TTL (if any). + * @param conflictExpireTime Conflict expire time (if any). + * @param conflictVer DR version (if any). * @param drResolve If {@code true} then performs DR conflicts resolution. * @param intercept If {@code true} then calls cache interceptor. * @param subjId Subject ID initiated this update. @@ -455,10 +456,7 @@ public interface GridCacheEntryEx<K, V> { boolean checkVer, @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter, GridDrType drType, - long drTtl, - long drExpireTime, - @Nullable GridCacheVersion drVer, - boolean drResolve, + GridCacheConflictInnerUpdate conflict, boolean intercept, @Nullable UUID subjId, String taskName http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b4045ce1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index cc879cb..e1a0189 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -21,6 +21,7 @@ import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.cache.eviction.*; import org.apache.ignite.internal.managers.deployment.*; +import org.apache.ignite.internal.processors.cache.conflict.*; import org.apache.ignite.internal.processors.cache.distributed.dht.*; import org.apache.ignite.internal.processors.cache.extras.*; import org.apache.ignite.internal.processors.cache.query.*; @@ -164,7 +165,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> this.hash = hash; this.cctx = cctx; - ttlAndExpireTimeExtras(ttl, toExpireTime(ttl)); + ttlAndExpireTimeExtras(ttl, CU.toExpireTime(ttl)); if (cctx.portableEnabled()) val = (V)cctx.kernalContext().portable().detachPortable(val); @@ -844,7 +845,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> V prevVal = rawGetOrUnmarshalUnlocked(false); - long expTime = toExpireTime(ttl); + long expTime = CU.toExpireTime(ttl); if (loadedFromStore) // Update indexes before actual write to entry. @@ -918,7 +919,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> V old = rawGetOrUnmarshalUnlocked(false); - long expTime = toExpireTime(ttl); + long expTime = CU.toExpireTime(ttl); // Detach value before index update. if (cctx.portableEnabled()) @@ -1048,7 +1049,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> expireTime = expireTimeExtras(); } else - expireTime = toExpireTime(ttl); + expireTime = CU.toExpireTime(ttl); } assert ttl >= 0 : ttl; @@ -1433,7 +1434,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> expireTime = expireTimeExtras(); } else - expireTime = toExpireTime(ttl); + expireTime = CU.toExpireTime(ttl); } else { ttl = ttlExtras(); @@ -1534,6 +1535,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> } /** {@inheritDoc} */ + @SuppressWarnings("unchecked") @Override public GridCacheUpdateAtomicResult<K, V> innerUpdate( GridCacheVersion newVer, UUID evtNodeId, @@ -1551,31 +1553,26 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> boolean verCheck, @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter, GridDrType drType, - long drTtl, - long drExpireTime, - @Nullable GridCacheVersion drVer, - boolean drResolve, + GridCacheConflictInnerUpdate conflict, boolean intercept, @Nullable UUID subjId, String taskName ) throws IgniteCheckedException, GridCacheEntryRemovedException, GridClosureException { assert cctx.atomic(); - V old; - boolean res = true; + V oldVal = null; V updated; GridCacheVersion enqueueVer = null; - GridCacheVersionConflictContext<K, V> drRes = null; + GridCacheVersionConflictContext<K, V> conflictCtx = null; EntryProcessorResult<?> invokeRes = null; - long newTtl = -1L; - long newExpireTime = 0L; - long newDrExpireTime = -1L; // Explicit DR expire time which possibly will be sent to DHT node. + long newTtl = CU.TTL_NOT_CHANGED; + long newExpireTime = CU.EXPIRE_TIME_CALCULATE; synchronized (this) { boolean needVal = intercept || retval || op == GridCacheOperation.TRANSFORM || !F.isEmptyOrNulls(filter); @@ -1588,56 +1585,46 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> Object transformClo = null; - if (drResolve) { - GridCacheVersion oldDrVer = version().drVersion(); - - boolean drNeedResolve = cctx.conflictNeedResolve(oldDrVer, drVer); - - if (drNeedResolve) { - // Get old value. - V oldVal = rawGetOrUnmarshalUnlocked(true); + // Request-level conflict resolution is needed, i.e. we do not know who will win in advance. + if (conflict.resolve()) { + GridCacheVersion oldConflictVer = version().conflictVersion(); + // Cache is conflict-enabled. + if (cctx.conflictNeedResolve()) { + // Get new value, optionally unmarshalling and/or transforming it. if (writeObj == null && valBytes != null) writeObj = cctx.marshaller().unmarshal(valBytes, cctx.deploy().globalLoader()); if (op == GridCacheOperation.TRANSFORM) { transformClo = writeObj; - writeObj = ((IgniteClosure<V, V>)writeObj).apply(oldVal); - } - - K k = key(); - - if (drTtl >= 0L) { - // DR TTL is set explicitly - assert drExpireTime >= 0L; - - newTtl = drTtl; - newExpireTime = drExpireTime; + writeObj = ((IgniteClosure<V, V>)writeObj).apply(rawGetOrUnmarshalUnlocked(true)); + valBytes = null; } - else { - long ttl = expiryPlc != null ? (isNew() ? expiryPlc.forCreate() : expiryPlc.forUpdate()) : -1L; - newTtl = ttl < 0 ? ttlExtras() : ttl; - newExpireTime = CU.toExpireTime(newTtl); - } + // Get TTL and expire time (no special-purpose TTL values can be set for conflict). + assert conflict.ttl() != CU.TTL_ZERO && conflict.ttl() != CU.TTL_NOT_CHANGED && conflict.ttl() >= 0; + assert conflict.expireTime() != CU.EXPIRE_TIME_CALCULATE && conflict.expireTime() >= 0; + // Prepare old and new entries for conflict resolution. GridCacheVersionedEntryEx<K, V> oldEntry = versionedEntry(); - GridCacheVersionedEntryEx<K, V> newEntry = - new GridCachePlainVersionedEntry<>(k, (V)writeObj, newTtl, newExpireTime, drVer); + GridCacheVersionedEntryEx<K, V> newEntry = new GridCachePlainVersionedEntry<>(key, (V)writeObj, + conflict.ttl(), conflict.expireTime(), conflict.version()); - drRes = cctx.conflictResolve(oldEntry, newEntry, verCheck); + // Resolve conflict. + conflictCtx = cctx.conflictResolve(oldEntry, newEntry, verCheck); - assert drRes != null; + assert conflictCtx != null; - if (drRes.isUseOld()) { + // Use old value? + if (conflictCtx.isUseOld()) { // Handle special case with atomic comparator. - if (!isNew() && // Not initial value, - verCheck && // and atomic version check, - oldDrVer.dataCenterId() == drVer.dataCenterId() && // and data centers are equal, - ATOMIC_VER_COMPARATOR.compare(oldDrVer, drVer) == 0 && // and both versions are equal, - cctx.writeThrough() && // and store is enabled, - primary) // and we are primary. + if (!isNew() && // Not initial value, + verCheck && // and atomic version check, + oldConflictVer.dataCenterId() == conflict.version().dataCenterId() && // and data centers are equal, + ATOMIC_VER_COMPARATOR.compare(oldConflictVer, conflict.version()) == 0 && // and both versions are equal, + cctx.writeThrough() && // and store is enabled, + primary) // and we are primary. { V val = rawGetOrUnmarshalUnlocked(false); @@ -1650,47 +1637,43 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> cctx.store().putToStore(null, key(), val, ver); } - old = retval ? rawGetOrUnmarshalUnlocked(false) : val; - return new GridCacheUpdateAtomicResult<>(false, - old, + retval ? rawGetOrUnmarshalUnlocked(false) : null, + null, null, invokeRes, - 0L, - -1L, + CU.TTL_ETERNAL, + CU.EXPIRE_TIME_ETERNAL, null, null, false); } - else if (drRes.isUseNew()) - op = writeObj != null ? GridCacheOperation.UPDATE : GridCacheOperation.DELETE; + // Will update something. else { - assert drRes.isMerge(); - - writeObj = drRes.mergeValue(); - valBytes = null; + // Merge is a local update which override passed value bytes. + if (conflictCtx.isMerge()) { + writeObj = conflictCtx.mergeValue(); + valBytes = null; - drVer = null; // Update will be considered as local. + conflict.clearVersion(); // Update will be considered as local. + } + else + assert conflictCtx.isUseNew(); + // Update value is known at this point, so update operation type. op = writeObj != null ? GridCacheOperation.UPDATE : GridCacheOperation.DELETE; } - newTtl = drRes.ttl(); - newExpireTime = drRes.expireTime(); - - // Explicit DR expire time will be passed to remote node only in that case. - if (!drRes.explicitTtl() && !drRes.isMerge()) { - if (drRes.isUseNew() && newEntry.dataCenterId() != cctx.dataCenterId() || - drRes.isUseOld() && oldEntry.dataCenterId() != cctx.dataCenterId()) - newDrExpireTime = drRes.expireTime(); - } + newTtl = conflictCtx.ttl(); + newExpireTime = conflictCtx.expireTime(); } else - // Nullify DR version on this update, so that we will use regular version during next updates. - drVer = null; + // Nullify conflict version on this update, so that we will use regular version during next updates. + conflict.clearVersion(); } - if (drRes == null) { // Perform version check only in case there will be no explicit conflict resolution. + // Perform version check only in case there was no explicit conflict resolution. + if (conflictCtx == null) { if (verCheck) { if (!isNew() && ATOMIC_VER_COMPARATOR.compare(ver, newVer) >= 0) { if (ATOMIC_VER_COMPARATOR.compare(ver, newVer) == 0 && cctx.writeThrough() && primary) { @@ -1714,14 +1697,13 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> "[entry=" + this + ", newVer=" + newVer + ']'); } - old = retval ? rawGetOrUnmarshalUnlocked(false) : val; - return new GridCacheUpdateAtomicResult<>(false, - old, + retval ? rawGetOrUnmarshalUnlocked(false) : null, + null, null, invokeRes, - -1L, - -1L, + CU.TTL_ETERNAL, + CU.EXPIRE_TIME_ETERNAL, null, null, false); @@ -1732,46 +1714,48 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> "Invalid version for inner update [entry=" + this + ", newVer=" + newVer + ']'; } - // Possibly get old value form store. - old = needVal ? rawGetOrUnmarshalUnlocked(!retval) : val; - - GridCacheValueBytes oldBytes = valueBytesUnlocked(); + // Prepare old value and value bytes. + oldVal = needVal ? rawGetOrUnmarshalUnlocked(!retval) : val; + GridCacheValueBytes oldValBytes = valueBytesUnlocked(); + // Possibly read value from store. boolean readThrough = false; - if (needVal && old == null && (cctx.readThrough() && (op == GridCacheOperation.TRANSFORM || cctx.loadPreviousValue()))) { - old = readThrough(null, key, false, subjId, taskName); + if (needVal && oldVal == null && (cctx.readThrough() && + (op == GridCacheOperation.TRANSFORM || cctx.loadPreviousValue()))) { + oldVal = readThrough(null, key, false, subjId, taskName); readThrough = true; // Detach value before index update. if (cctx.portableEnabled()) - old = (V)cctx.kernalContext().portable().detachPortable(old); + oldVal = (V)cctx.kernalContext().portable().detachPortable(oldVal); - long ttl = 0; - long expireTime = 0; + // Calculate initial TTL and expire time. + long initTtl = 0; + long initExpireTime = 0; - if (expiryPlc != null && old != null) { - ttl = expiryPlc.forCreate(); + if (expiryPlc != null && oldVal != null) { + initTtl = expiryPlc.forCreate(); - if (ttl == CU.TTL_ZERO) { - ttl = CU.TTL_MINIMUM; - expireTime = CU.expireTimeInPast(); + if (initTtl == CU.TTL_ZERO) { + initTtl = CU.TTL_MINIMUM; + initExpireTime = CU.expireTimeInPast(); } - else if (ttl == CU.TTL_NOT_CHANGED) - ttl = 0; + else if (initTtl == CU.TTL_NOT_CHANGED) + initTtl = CU.TTL_ETERNAL; else - expireTime = CU.toExpireTime(ttl); + initExpireTime = CU.toExpireTime(initTtl); } - if (old != null) - updateIndex(old, null, expireTime, ver, null); + if (oldVal != null) + updateIndex(oldVal, null, initExpireTime, ver, null); else clearIndex(null); - update(old, null, expireTime, ttl, ver); + update(oldVal, null, initExpireTime, initTtl, ver); - if (deletedUnlocked() && old != null && !isInternal()) + if (deletedUnlocked() && oldVal != null && !isInternal()) deletedUnlocked(false); } @@ -1779,7 +1763,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> if (metrics && cctx.cache().configuration().isStatisticsEnabled() && needVal) { // PutIfAbsent methods mustn't update hit/miss statistics if (op != GridCacheOperation.UPDATE || F.isEmpty(filter) || filter != cctx.noPeekArray()) - cctx.cache().metrics0().onRead(old != null); + cctx.cache().metrics0().onRead(oldVal != null); } // Check filter inside of synchronization. @@ -1791,24 +1775,27 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> updateTtl(expiryPlc); return new GridCacheUpdateAtomicResult<>(false, - retval ? old : null, + retval ? oldVal : null, + null, null, invokeRes, - -1L, - -1L, + CU.TTL_ETERNAL, + CU.EXPIRE_TIME_ETERNAL, null, null, false); } } - // Calculate new value. + // Calculate new value in case we met transform. if (op == GridCacheOperation.TRANSFORM) { + assert conflictCtx == null : "Cannot be TRANSFORM here is conflict resolution was performed earlier."; + transformClo = writeObj; EntryProcessor<K, V, ?> entryProcessor = (EntryProcessor<K, V, ?>)writeObj; - CacheInvokeEntry<K, V> entry = new CacheInvokeEntry<>(cctx, key, old); + CacheInvokeEntry<K, V> entry = new CacheInvokeEntry<>(cctx, key, oldVal); try { Object computed = entryProcessor.process(entry, invokeArgs); @@ -1823,9 +1810,9 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> catch (Exception e) { invokeRes = new CacheInvokeResult<>(e); - updated = old; + updated = oldVal; - valBytes = oldBytes.getIfMarshaled(); + valBytes = oldValBytes.getIfMarshaled(); } if (!entry.modified()) { @@ -1833,11 +1820,12 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> updateTtl(expiryPlc); return new GridCacheUpdateAtomicResult<>(false, - retval ? old : null, + retval ? oldVal : null, + null, null, invokeRes, - -1L, - -1L, + CU.TTL_ETERNAL, + CU.EXPIRE_TIME_ETERNAL, null, null, false); @@ -1852,30 +1840,29 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> boolean hadVal = hasValueUnlocked(); - // Incorporate DR version into new version if needed. - if (drVer != null && drVer != newVer) + // Incorporate conflict version into new version if needed. + if (conflict.version() != null && conflict.version() != newVer) newVer = new GridCacheVersionEx(newVer.topologyVersion(), newVer.globalTime(), newVer.order(), newVer.nodeOrder(), newVer.dataCenterId(), - drVer); + conflict.version()); - IgniteBiTuple<Boolean, V> interceptRes = null; - - long ttl0 = newTtl; if (op == GridCacheOperation.UPDATE) { - if (drRes == null) { + // Conflict context is null if there were no explicit conflict resolution. + if (conflictCtx == null) { // Calculate TTL and expire time for local update. - if (drTtl >= 0L) { - assert drExpireTime >= 0L : drExpireTime; + if (conflict.hasExplicitTtl()) { + // TTL/expireTime was sent to us from node where conflict had been resolved. + assert conflict.hasExplicitExpireTime() : conflict.expireTime(); - ttl0 = drTtl; - newExpireTime = drExpireTime; + newTtl = conflict.ttl(); + newExpireTime = conflict.expireTime(); } else { - assert drExpireTime == CU.TTL_NOT_CHANGED : drExpireTime; + assert !conflict.hasExplicitExpireTime() : conflict.expireTime(); if (expiryPlc != null) newTtl = hadVal ? expiryPlc.forUpdate() : expiryPlc.forCreate(); @@ -1883,36 +1870,50 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> newTtl = CU.TTL_NOT_CHANGED; if (newTtl == CU.TTL_NOT_CHANGED) { - ttl0 = ttlExtras(); + newTtl = ttlExtras(); newExpireTime = expireTimeExtras(); } - else { - ttl0 = newTtl; - newExpireTime = toExpireTime(ttl0); + else if (newTtl == CU.TTL_ZERO) { + op = GridCacheOperation.DELETE; + + // This is delete, so make TTL and expire time eternal. + newTtl = CU.TTL_ETERNAL; + newExpireTime = CU.EXPIRE_TIME_ETERNAL; + + updated = null; + valBytes = null; } + else + newExpireTime = CU.toExpireTime(newTtl); } } - else if (newTtl == CU.TTL_NOT_CHANGED) - ttl0 = ttlExtras(); } + else { + assert op == GridCacheOperation.DELETE; - if (ttl0 == CU.TTL_ZERO) { - op = GridCacheOperation.DELETE; - - updated = null; + newTtl = CU.TTL_ETERNAL; + newExpireTime = CU.EXPIRE_TIME_ETERNAL; } + // TTL and expire time must be resolved at this point. + assert newTtl != CU.TTL_NOT_CHANGED && newTtl != CU.TTL_ZERO && newTtl >= 0; + assert newExpireTime != CU.EXPIRE_TIME_CALCULATE && newExpireTime >= 0; + + IgniteBiTuple<Boolean, V> interceptRes = null; + + // Actual update. if (op == GridCacheOperation.UPDATE) { if (intercept) { - V interceptorVal = (V)cctx.config().getInterceptor().onBeforePut(key, old, updated); + V interceptorVal = (V)cctx.config().getInterceptor().onBeforePut(key, oldVal, updated); if (interceptorVal == null) return new GridCacheUpdateAtomicResult<>(false, - retval ? old : null, + retval ? oldVal : null, + null, null, invokeRes, - -1L, - -1L, + CU.TTL_ETERNAL, + CU.EXPIRE_TIME_ETERNAL, null, null, false); @@ -1948,9 +1949,9 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> // Update index inside synchronization since it can be updated // in load methods without actually holding entry lock. - updateIndex(updated, valBytes, newExpireTime, newVer, old); + updateIndex(updated, valBytes, newExpireTime, newVer, oldVal); - update(updated, valBytes, newExpireTime, ttl0, newVer); + update(updated, valBytes, newExpireTime, newTtl, newVer); drReplicate(drType, updated, valBytes, newVer); @@ -1960,7 +1961,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> V evtOld = null; if (transformClo != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) { - evtOld = cctx.unwrapTemporary(old); + evtOld = cctx.unwrapTemporary(oldVal); cctx.events().addEvent(partition(), key, evtNodeId, null, newVer, EVT_CACHE_OBJECT_READ, evtOld, evtOld != null || hadVal, evtOld, @@ -1969,7 +1970,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> if (newVer != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_PUT)) { if (evtOld == null) - evtOld = cctx.unwrapTemporary(old); + evtOld = cctx.unwrapTemporary(oldVal); cctx.events().addEvent(partition(), key, evtNodeId, null, newVer, EVT_CACHE_OBJECT_PUT, updated, updated != null, evtOld, @@ -1979,15 +1980,16 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> } else { if (intercept) { - interceptRes = cctx.config().getInterceptor().onBeforeRemove(key, old); + interceptRes = cctx.config().getInterceptor().onBeforeRemove(key, oldVal); if (cctx.cancelRemove(interceptRes)) return new GridCacheUpdateAtomicResult<>(false, cctx.<V>unwrapTemporary(interceptRes.get2()), null, + null, invokeRes, - -1L, - -1L, + CU.TTL_ETERNAL, + CU.EXPIRE_TIME_ETERNAL, null, null, false); @@ -1999,7 +2001,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> // Update index inside synchronization since it can be updated // in load methods without actually holding entry lock. - clearIndex(old); + clearIndex(oldVal); if (hadVal) { assert !deletedUnlocked(); @@ -2024,7 +2026,10 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> boolean hasValPtr = valPtr != 0; // Clear value on backup. Entry will be removed from cache when it got evicted from queue. - update(null, null, 0, 0, newVer); + assert newTtl == CU.TTL_ETERNAL; + assert newExpireTime == CU.EXPIRE_TIME_ETERNAL; + + update(null, null, newTtl, newExpireTime, newVer); if (cctx.offheapTiered() && hasValPtr) { boolean rmv = cctx.swap().removeOffheap(key, getOrMarshalKeyBytes()); @@ -2042,7 +2047,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> V evtOld = null; if (transformClo != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) { - evtOld = cctx.unwrapTemporary(old); + evtOld = cctx.unwrapTemporary(oldVal); cctx.events().addEvent(partition(), key, evtNodeId, null, newVer, EVT_CACHE_OBJECT_READ, evtOld, evtOld != null || hadVal, evtOld, @@ -2051,7 +2056,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> if (newVer != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_REMOVED)) { if (evtOld == null) - evtOld = cctx.unwrapTemporary(old); + evtOld = cctx.unwrapTemporary(oldVal); cctx.events().addEvent(partition(), key, evtNodeId, null, newVer, EVT_CACHE_OBJECT_REMOVED, null, false, evtOld, evtOld != null || hadVal, @@ -2060,17 +2065,14 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> } res = hadVal; - - // Do not propagate zeroed TTL and expire time. - newTtl = -1L; - newDrExpireTime = -1L; } if (res) updateMetrics(op, metrics); if (cctx.isReplicated() || primary) - cctx.continuousQueries().onEntryUpdated(this, key, val, valueBytesUnlocked(), old, oldBytes, false); + cctx.continuousQueries().onEntryUpdated(this, key, val, valueBytesUnlocked(), + oldVal, oldValBytes, false); cctx.dataStructures().onEntryUpdated(key, op == GridCacheOperation.DELETE); @@ -2078,24 +2080,31 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> if (op == GridCacheOperation.UPDATE) cctx.config().getInterceptor().onAfterPut(key, val); else - cctx.config().getInterceptor().onAfterRemove(key, old); + cctx.config().getInterceptor().onAfterRemove(key, oldVal); if (interceptRes != null) - old = cctx.unwrapTemporary(interceptRes.get2()); + oldVal = cctx.unwrapTemporary(interceptRes.get2()); } } if (log.isDebugEnabled()) - log.debug("Updated cache entry [val=" + val + ", old=" + old + ", entry=" + this + ']'); + log.debug("Updated cache entry [val=" + val + ", old=" + oldVal + ", entry=" + this + ']'); + + // Ensure that TTL / expire stuff is not sent over wire when not needed. + if (!res || op == GridCacheOperation.DELETE) { + newTtl = CU.TTL_NOT_CHANGED; + newExpireTime = CU.EXPIRE_TIME_CALCULATE; + } return new GridCacheUpdateAtomicResult<>(res, - old, + oldVal, updated, + valBytes, invokeRes, newTtl, - newDrExpireTime, + newExpireTime, enqueueVer, - drRes, + conflictCtx, true); } @@ -2111,7 +2120,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> private void drReplicate(GridDrType drType, @Nullable V val, @Nullable byte[] valBytes, GridCacheVersion ver) throws IgniteCheckedException { if (cctx.isDrEnabled() && drType != DR_NONE && !isInternal()) - cctx.dr().replicate(key, keyBytes, val, valBytes, rawTtl(), rawExpireTime(), ver.drVersion(), drType); + cctx.dr().replicate(key, keyBytes, val, valBytes, rawTtl(), rawExpireTime(), ver.conflictVersion(), drType); } /** @@ -2473,7 +2482,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> GridCacheVersion ver) { assert ver != null; assert Thread.holdsLock(this); - assert ttl >= 0 : ttl; + assert ttl != CU.TTL_ZERO && ttl != CU.TTL_NOT_CHANGED && ttl >= 0 : ttl; long oldExpireTime = expireTimeExtras(); @@ -2537,7 +2546,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> expireTime = CU.expireTimeInPast(); } else - expireTime = toExpireTime(ttl); + expireTime = CU.toExpireTime(ttl); long oldExpireTime = expireTimeExtras(); @@ -2565,22 +2574,6 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> } /** - * @param ttl Time to live. - * @return Expiration time. - */ - public static long toExpireTime(long ttl) { - assert ttl != CU.TTL_ZERO && ttl != CU.TTL_NOT_CHANGED && ttl >= 0; - - long expireTime = ttl == CU.TTL_ETERNAL ? CU.EXPIRE_TIME_ETERNAL : U.currentTimeMillis() + ttl; - - // Account for overflow. - if (expireTime < 0) - expireTime = CU.EXPIRE_TIME_ETERNAL; - - return expireTime; - } - - /** * @throws GridCacheEntryRemovedException If entry is obsolete. */ protected void checkObsolete() throws GridCacheEntryRemovedException { @@ -3066,7 +3059,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> @Override public synchronized V rawPut(V val, long ttl) { V old = this.val; - update(val, null, toExpireTime(ttl), ttl, nextVersion()); + update(val, null, CU.toExpireTime(ttl), ttl, nextVersion()); return old; } @@ -3090,7 +3083,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> checkObsolete(); if (isNew() || (!preload && deletedUnlocked())) { - long expTime = expireTime < 0 ? toExpireTime(ttl) : expireTime; + long expTime = expireTime < 0 ? CU.toExpireTime(ttl) : expireTime; if (cctx.portableEnabled()) val = (V)cctx.kernalContext().portable().detachPortable(val); @@ -3167,7 +3160,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> boolean isNew = isStartVersion(); return new GridCachePlainVersionedEntry<>(key, isNew ? unswap(true, true) : rawGetOrUnmarshalUnlocked(false), - ttlExtras(), expireTimeExtras(), ver.drVersion(), isNew); + ttlExtras(), expireTimeExtras(), ver.conflictVersion(), isNew); } /** {@inheritDoc} */ @@ -3184,7 +3177,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> long ttl = ttlExtras(); - long expTime = toExpireTime(ttl); + long expTime = CU.toExpireTime(ttl); // Detach value before index update. if (cctx.portableEnabled()) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b4045ce1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionEx.java index 4e93122..7cb3de5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionEx.java @@ -112,7 +112,7 @@ public interface GridCacheProjectionEx<K, V> extends CacheProjection<K, V> { * @throws IgniteCheckedException If put operation failed. * @throws CacheFlagException If projection flags validation failed. */ - public void putAllDr(Map<? extends K, GridCacheDrInfo<V>> drMap) throws IgniteCheckedException; + public void putAllConflict(Map<? extends K, GridCacheDrInfo<V>> drMap) throws IgniteCheckedException; /** * Store DR data asynchronously. @@ -122,7 +122,7 @@ public interface GridCacheProjectionEx<K, V> extends CacheProjection<K, V> { * @throws IgniteCheckedException If put operation failed. * @throws CacheFlagException If projection flags validation failed. */ - public IgniteInternalFuture<?> putAllDrAsync(Map<? extends K, GridCacheDrInfo<V>> drMap) throws IgniteCheckedException; + public IgniteInternalFuture<?> putAllConflictAsync(Map<? extends K, GridCacheDrInfo<V>> drMap) throws IgniteCheckedException; /** * Internal method that is called from {@link GridCacheEntryImpl}. @@ -154,7 +154,7 @@ public interface GridCacheProjectionEx<K, V> extends CacheProjection<K, V> { * @throws IgniteCheckedException If remove failed. * @throws CacheFlagException If projection flags validation failed. */ - public void removeAllDr(Map<? extends K, GridCacheVersion> drMap) throws IgniteCheckedException; + public void removeAllConflict(Map<? extends K, GridCacheVersion> drMap) throws IgniteCheckedException; /** * Removes DR data asynchronously. @@ -164,7 +164,7 @@ public interface GridCacheProjectionEx<K, V> extends CacheProjection<K, V> { * @throws IgniteCheckedException If remove failed. * @throws CacheFlagException If projection flags validation failed. */ - public IgniteInternalFuture<?> removeAllDrAsync(Map<? extends K, GridCacheVersion> drMap) throws IgniteCheckedException; + public IgniteInternalFuture<?> removeAllConflictAsync(Map<? extends K, GridCacheVersion> drMap) throws IgniteCheckedException; /** * Internal method that is called from {@link GridCacheEntryImpl}. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b4045ce1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java index eb854d5..041d222 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java @@ -794,14 +794,14 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V } /** {@inheritDoc} */ - @Override public void putAllDr(Map<? extends K, GridCacheDrInfo<V>> drMap) throws IgniteCheckedException { - cache.putAllDr(drMap); + @Override public void putAllConflict(Map<? extends K, GridCacheDrInfo<V>> drMap) throws IgniteCheckedException { + cache.putAllConflict(drMap); } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<?> putAllDrAsync(Map<? extends K, GridCacheDrInfo<V>> drMap) + @Override public IgniteInternalFuture<?> putAllConflictAsync(Map<? extends K, GridCacheDrInfo<V>> drMap) throws IgniteCheckedException { - return cache.putAllDrAsync(drMap); + return cache.putAllConflictAsync(drMap); } /** {@inheritDoc} */ @@ -1130,13 +1130,13 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V } /** {@inheritDoc} */ - @Override public void removeAllDr(Map<? extends K, GridCacheVersion> drMap) throws IgniteCheckedException { - cache.removeAllDr(drMap); + @Override public void removeAllConflict(Map<? extends K, GridCacheVersion> drMap) throws IgniteCheckedException { + cache.removeAllConflict(drMap); } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<?> removeAllDrAsync(Map<? extends K, GridCacheVersion> drMap) throws IgniteCheckedException { - return cache.removeAllDrAsync(drMap); + @Override public IgniteInternalFuture<?> removeAllConflictAsync(Map<? extends K, GridCacheVersion> drMap) throws IgniteCheckedException { + return cache.removeAllConflictAsync(drMap); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b4045ce1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java index 2a653af..2fc59b9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java @@ -738,11 +738,11 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali } /** {@inheritDoc} */ - @Override public void putAllDr(Map<? extends K, GridCacheDrInfo<V>> drMap) throws IgniteCheckedException { + @Override public void putAllConflict(Map<? extends K, GridCacheDrInfo<V>> drMap) throws IgniteCheckedException { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { - delegate.putAllDr(drMap); + delegate.putAllConflict(drMap); } finally { gate.leave(prev); @@ -750,12 +750,12 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<?> putAllDrAsync(Map<? extends K, GridCacheDrInfo<V>> drMap) + @Override public IgniteInternalFuture<?> putAllConflictAsync(Map<? extends K, GridCacheDrInfo<V>> drMap) throws IgniteCheckedException { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { - return delegate.putAllDrAsync(drMap); + return delegate.putAllConflictAsync(drMap); } finally { gate.leave(prev); @@ -1454,11 +1454,11 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali } /** {@inheritDoc} */ - @Override public void removeAllDr(Map<? extends K, GridCacheVersion> drMap) throws IgniteCheckedException { + @Override public void removeAllConflict(Map<? extends K, GridCacheVersion> drMap) throws IgniteCheckedException { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { - delegate.removeAllDr(drMap); + delegate.removeAllConflict(drMap); } finally { gate.leave(prev); @@ -1466,11 +1466,11 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<?> removeAllDrAsync(Map<? extends K, GridCacheVersion> drMap) throws IgniteCheckedException { + @Override public IgniteInternalFuture<?> removeAllConflictAsync(Map<? extends K, GridCacheVersion> drMap) throws IgniteCheckedException { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { - return delegate.removeAllDrAsync(drMap); + return delegate.removeAllConflictAsync(drMap); } finally { gate.leave(prev); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b4045ce1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java index cabfaa5..136e1bc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java @@ -39,11 +39,14 @@ public class GridCacheUpdateAtomicResult<K, V> { @GridToStringInclude private final V newVal; + /** New value bytes. */ + private final byte[] newValBytes; + /** New TTL. */ private final long newTtl; /** Explicit DR expire time (if any). */ - private final long drExpireTime; + private final long conflictExpireTime; /** Version for deferred delete. */ @GridToStringInclude @@ -51,7 +54,7 @@ public class GridCacheUpdateAtomicResult<K, V> { /** DR resolution result. */ @GridToStringInclude - private final GridCacheVersionConflictContext<K, V> drRes; + private final GridCacheVersionConflictContext<K, V> conflictRes; /** Whether update should be propagated to DHT node. */ private final boolean sndToDht; @@ -65,30 +68,33 @@ public class GridCacheUpdateAtomicResult<K, V> { * @param success Success flag. * @param oldVal Old value. * @param newVal New value. + * @param newValBytes New value bytes. * @param res Value computed by the {@link EntryProcessor}. * @param newTtl New TTL. - * @param drExpireTime Explicit DR expire time (if any). + * @param conflictExpireTime Explicit DR expire time (if any). * @param rmvVer Version for deferred delete. - * @param drRes DR resolution result. + * @param conflictRes DR resolution result. * @param sndToDht Whether update should be propagated to DHT node. */ public GridCacheUpdateAtomicResult(boolean success, @Nullable V oldVal, @Nullable V newVal, + @Nullable byte[] newValBytes, @Nullable EntryProcessorResult<?> res, long newTtl, - long drExpireTime, + long conflictExpireTime, @Nullable GridCacheVersion rmvVer, - @Nullable GridCacheVersionConflictContext<K, V> drRes, + @Nullable GridCacheVersionConflictContext<K, V> conflictRes, boolean sndToDht) { this.success = success; this.oldVal = oldVal; this.newVal = newVal; + this.newValBytes = newValBytes; this.res = res; this.newTtl = newTtl; - this.drExpireTime = drExpireTime; + this.conflictExpireTime = conflictExpireTime; this.rmvVer = rmvVer; - this.drRes = drRes; + this.conflictRes = conflictRes; this.sndToDht = sndToDht; } @@ -121,17 +127,25 @@ public class GridCacheUpdateAtomicResult<K, V> { } /** - * @return {@code -1} if TTL did not change, otherwise new TTL. + * @return New value bytes. + */ + @Nullable public byte[] newValueBytes() { + return newValBytes; + } + + /** + * @return {@link GridCacheUtils#TTL_NOT_CHANGED} if TTL did not change, otherwise new TTL. */ public long newTtl() { return newTtl; } /** - * @return Explicit DR expire time (if any). + * @return Explicit conflict expire time (if any). Set only if it is necessary to propagate concrete expire time + * value to DHT node. Otherwise set to {@link GridCacheUtils#EXPIRE_TIME_CALCULATE}. */ - public long drExpireTime() { - return drExpireTime; + public long conflictExpireTime() { + return conflictExpireTime; } /** @@ -144,8 +158,8 @@ public class GridCacheUpdateAtomicResult<K, V> { /** * @return DR conflict resolution context. */ - @Nullable public GridCacheVersionConflictContext<K, V> drResolveResult() { - return drRes; + @Nullable public GridCacheVersionConflictContext<K, V> conflictResolveResult() { + return conflictRes; } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b4045ce1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java index b4f306b..fc5130c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java @@ -1614,15 +1614,15 @@ public class GridCacheUtils { * @return Expire time. */ public static long toExpireTime(long ttl) { - assert ttl >= 0L : ttl; + assert ttl != CU.TTL_ZERO && ttl != CU.TTL_NOT_CHANGED && ttl >= 0; - if (ttl == 0L) - return 0L; - else { - long expireTime = U.currentTimeMillis() + ttl; + long expireTime = ttl == CU.TTL_ETERNAL ? CU.EXPIRE_TIME_ETERNAL : U.currentTimeMillis() + ttl; - return expireTime > 0L ? expireTime : 0L; - } + // Account for overflow. + if (expireTime < 0) + expireTime = CU.EXPIRE_TIME_ETERNAL; + + return expireTime; } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b4045ce1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/conflict/GridCacheConflictInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/conflict/GridCacheConflictInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/conflict/GridCacheConflictInfo.java new file mode 100644 index 0000000..8f210cf --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/conflict/GridCacheConflictInfo.java @@ -0,0 +1,53 @@ +package org.apache.ignite.internal.processors.cache.conflict; + +import org.apache.ignite.internal.processors.cache.version.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; + +/** + * Cache conflict info which is passed over the wire. + */ +public abstract class GridCacheConflictInfo implements Externalizable { + /** + * Create conflict info. + * + * @param ver Version. + * @param ttl TTL. + * @param expireTime Expire time. + * @return Conflict info. + */ + public static GridCacheConflictInfo create(GridCacheVersion ver, long ttl, long expireTime) { + if (ttl == CU.TTL_NOT_CHANGED) { + assert expireTime == CU.EXPIRE_TIME_CALCULATE; + + return new GridCacheNoTtlConflictInfo(ver); + } + else { + assert ttl != CU.TTL_ZERO && ttl >= 0; + assert expireTime != CU.EXPIRE_TIME_CALCULATE && expireTime >= 0; + + return new GridCacheTtlConflictInfo(ver, ttl, expireTime); + } + } + + /** + * @return Version. + */ + public abstract GridCacheVersion version(); + + /** + * @return TTL. + */ + public abstract long ttl(); + + /** + * @return Expire time. + */ + public abstract long expireTime(); + + /** + * @return {@code True} if has expiration info. + */ + public abstract boolean hasExpirationInfo(); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b4045ce1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/conflict/GridCacheConflictInnerUpdate.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/conflict/GridCacheConflictInnerUpdate.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/conflict/GridCacheConflictInnerUpdate.java new file mode 100644 index 0000000..75b7e3b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/conflict/GridCacheConflictInnerUpdate.java @@ -0,0 +1,88 @@ +package org.apache.ignite.internal.processors.cache.conflict; + +import org.apache.ignite.internal.processors.cache.version.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +/** + * Conflict inner update info. + */ +public class GridCacheConflictInnerUpdate { + /** Resolve flag. */ + private final boolean resolve; + + /** Version. */ + private GridCacheVersion ver; + + /** TTL. */ + private final long ttl; + + /** Expire time. */ + private final long expireTime; + + /** + * Conflict inner update info. + * + * @param resolve Resolve flag. + * @param ver Version. + * @param ttl TTL. + * @param expireTime Expire time. + */ + public GridCacheConflictInnerUpdate(boolean resolve, GridCacheVersion ver, long ttl, long expireTime) { + // TODO: IGNITE-283: Add assertion for invariants. + + this.resolve = resolve; + this.ver = ver; + this.ttl = ttl; + this.expireTime = expireTime; + } + + /** + * @return Resolve flag. + */ + public boolean resolve() { + return resolve; + } + + /** + * @return Version. + */ + @Nullable public GridCacheVersion version() { + return ver; + } + + /** + * Clear version so that update will be considered local. + */ + public void clearVersion() { + ver = null; + } + + /* + * @return TTL. + */ + public long ttl() { + return ttl; + } + + /** + * @return {@code True} if explicit TTL is set. + */ + public boolean hasExplicitTtl() { + return ttl != CU.TTL_NOT_CHANGED; + } + + /** + * @return {@code True} if explicit expire time is set. + */ + public boolean hasExplicitExpireTime() { + return expireTime != CU.EXPIRE_TIME_CALCULATE; + } + + /** + * @return Expire time. + */ + public long expireTime() { + return expireTime; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b4045ce1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/conflict/GridCacheNoTtlConflictInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/conflict/GridCacheNoTtlConflictInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/conflict/GridCacheNoTtlConflictInfo.java new file mode 100644 index 0000000..926feef --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/conflict/GridCacheNoTtlConflictInfo.java @@ -0,0 +1,60 @@ +package org.apache.ignite.internal.processors.cache.conflict; + +import org.apache.ignite.internal.processors.cache.version.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; + +/** + * Conflict info without TTL. + */ +public class GridCacheNoTtlConflictInfo extends GridCacheConflictInfo { + /** Version. */ + private GridCacheVersion ver; + + /** + * {@link Externalizable} support. + */ + public GridCacheNoTtlConflictInfo() { + // No-op. + } + + /** + * Constructor. + * + * @param ver Version. + */ + public GridCacheNoTtlConflictInfo(GridCacheVersion ver) { + this.ver = ver; + } + + /** {@inheritDoc} */ + @Override public GridCacheVersion version() { + return ver; + } + + /** {@inheritDoc} */ + @Override public long ttl() { + return CU.TTL_NOT_CHANGED; + } + + /** {@inheritDoc} */ + @Override public long expireTime() { + return CU.EXPIRE_TIME_CALCULATE; + } + + /** {@inheritDoc} */ + @Override public boolean hasExpirationInfo() { + return false; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(ver); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + ver = (GridCacheVersion)in.readObject(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b4045ce1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/conflict/GridCacheTtlConflictInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/conflict/GridCacheTtlConflictInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/conflict/GridCacheTtlConflictInfo.java new file mode 100644 index 0000000..f4e6f29 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/conflict/GridCacheTtlConflictInfo.java @@ -0,0 +1,78 @@ +package org.apache.ignite.internal.processors.cache.conflict; + +import org.apache.ignite.internal.processors.cache.version.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; +import java.nio.*; + +/** + * Conflict info with TTL. + */ +public class GridCacheTtlConflictInfo extends GridCacheConflictInfo { + /** Version. */ + private GridCacheVersion ver; + + /** TTL. */ + private long ttl; + + /** Expire time. */ + private long expireTime; + + /** + * {@link Externalizable} support. + */ + public GridCacheTtlConflictInfo() { + // No-op. + } + + /** + * Constructor. + * + * @param ver Version. + * @param ttl TTL. + * @param expireTime Expire time. + */ + public GridCacheTtlConflictInfo(GridCacheVersion ver, long ttl, long expireTime) { + assert ttl != CU.TTL_ZERO && ttl != CU.TTL_NOT_CHANGED; + assert expireTime != CU.EXPIRE_TIME_CALCULATE; + + this.ver = ver; + this.ttl = ttl; + this.expireTime = expireTime; + } + + /** {@inheritDoc} */ + @Override public GridCacheVersion version() { + return ver; + } + + /** {@inheritDoc} */ + @Override public long ttl() { + return ttl; + } + + /** {@inheritDoc} */ + @Override public long expireTime() { + return expireTime; + } + + /** {@inheritDoc} */ + @Override public boolean hasExpirationInfo() { + return true; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(ver); + out.writeLong(ttl); + out.writeLong(expireTime); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + ver = (GridCacheVersion)in.readObject(); + ttl = in.readLong(); + expireTime = in.readLong(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b4045ce1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java index 4f8357a..9c1d85f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java @@ -510,8 +510,7 @@ public class GridDistributedTxRemoteAdapter<K, V> extends IgniteTxAdapter<K, V> if (txEntry.ttl() == CU.TTL_ZERO) op = DELETE; - boolean drNeedResolve = - cacheCtx.conflictNeedResolve(cached.version(), explicitVer); + boolean drNeedResolve = cacheCtx.conflictNeedResolve(); if (drNeedResolve) { IgniteBiTuple<GridCacheOperation, GridCacheVersionConflictContext<K, V>>