# ignite-92
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/30a67831 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/30a67831 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/30a67831 Branch: refs/heads/ignite-sql-tests Commit: 30a678312f03d98ce2e675b4180ef6c03ec3b197 Parents: 332ac70 Author: sboikov <semen.boi...@inria.fr> Authored: Wed Feb 11 20:29:53 2015 +0300 Committer: sboikov <semen.boi...@inria.fr> Committed: Thu Feb 12 00:05:09 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheAdapter.java | 130 +++++++--- .../processors/cache/GridCacheMapEntry.java | 204 +++++++++++----- .../processors/cache/GridCacheUtils.java | 12 +- .../distributed/GridCacheTtlUpdateRequest.java | 2 +- .../GridDistributedTxRemoteAdapter.java | 3 + .../IgniteExternalizableExpiryPolicy.java | 14 +- .../distributed/dht/GridDhtCacheAdapter.java | 31 ++- .../dht/atomic/GridDhtAtomicCache.java | 2 +- .../distributed/near/GridNearCacheAdapter.java | 4 +- .../cache/transactions/IgniteTxAdapter.java | 17 ++ .../transactions/IgniteTxLocalAdapter.java | 13 +- .../optimized/OptimizedClassDescriptor.java | 2 +- .../IgniteComputeTopologyExceptionTest.java | 112 +++++++++ ...iteCacheAtomicExpiryPolicyWithStoreTest.java | 49 ++++ .../IgniteCacheExpiryPolicyAbstractTest.java | 154 +++++++++++- .../IgniteCacheExpiryPolicyTestSuite.java | 3 + ...eCacheExpiryPolicyWithStoreAbstractTest.java | 236 +++++++++++++++++++ .../IgniteCacheTxExpiryPolicyWithStoreTest.java | 43 ++++ .../testsuites/IgniteComputeGridTestSuite.java | 1 + 19 files changed, 909 insertions(+), 123 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/30a67831/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 3d5000b..2197238 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -27,6 +27,7 @@ import org.apache.ignite.internal.*; import org.apache.ignite.internal.cluster.*; import org.apache.ignite.internal.compute.*; import org.apache.ignite.internal.processors.cache.affinity.*; +import org.apache.ignite.internal.processors.cache.distributed.*; import org.apache.ignite.internal.processors.cache.distributed.dht.*; import org.apache.ignite.internal.processors.cache.dr.*; import org.apache.ignite.internal.processors.cache.query.*; @@ -3816,13 +3817,15 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, final boolean replicate = ctx.isDrEnabled(); final long topVer = ctx.affinity().affinityTopologyVersion(); + final ExpiryPolicy plc = ctx.expiry(); + if (ctx.store().isLocalStore()) { IgniteDataLoaderImpl<K, V> ldr = ctx.kernalContext().<K, V>dataLoad().dataLoader(ctx.namex(), false); try { ldr.updater(new GridDrDataLoadCacheUpdater<K, V>()); - LocalStoreLoadClosure c = new LocalStoreLoadClosure(p, ldr, ttl); + LocalStoreLoadClosure c = new LocalStoreLoadClosure(p, ldr, plc); ctx.store().loadCache(c, args); @@ -3841,6 +3844,17 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, throws IgniteException { assert ver == null; + long ttl = 0; + + if (plc != null) { + ttl = CU.toTtl(plc.getExpiryForCreation()); + + if (ttl == CU.TTL_ZERO) + return; + else if (ttl == CU.TTL_NOT_CHANGED) + ttl = 0; + } + loadEntry(key, val, ver0, p, topVer, replicate, ttl); } }, args); @@ -3916,6 +3930,10 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, if (!ctx.store().configured()) return new GridFinishedFuture<>(ctx.kernalContext()); + GridCacheProjectionImpl<K, V> prj = ctx.projectionPerCall(); + + ExpiryPolicy plc = prj != null ? prj.expiry() : null; + if (replaceExisting) { if (ctx.store().isLocalStore()) { Collection<ClusterNode> nodes = ctx.grid().forCacheNodes(name()).nodes(); @@ -3924,7 +3942,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, return new GridFinishedFuture<>(ctx.kernalContext()); return ctx.closures().callAsyncNoFailover(BROADCAST, - new LoadKeysCallable<>(ctx.name(), keys, true), + new LoadKeysCallable<>(ctx.name(), keys, true, plc), nodes, true); } @@ -3945,7 +3963,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, return new GridFinishedFuture<>(ctx.kernalContext()); return ctx.closures().callAsyncNoFailover(BROADCAST, - new LoadKeysCallable<>(ctx.name(), keys, false), + new LoadKeysCallable<>(ctx.name(), keys, false, plc), nodes, true); } @@ -3985,19 +4003,25 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, /** * @param keys Keys to load. + * @param plc Optional expiry policy. * @throws IgniteCheckedException If failed. */ - public void localLoad(Collection<? extends K> keys) throws IgniteCheckedException { + public void localLoad(Collection<? extends K> keys, + @Nullable ExpiryPolicy plc) + throws IgniteCheckedException + { final boolean replicate = ctx.isDrEnabled(); final long topVer = ctx.affinity().affinityTopologyVersion(); + final ExpiryPolicy plc0 = plc != null ? plc : ctx.expiry(); + if (ctx.store().isLocalStore()) { IgniteDataLoaderImpl<K, V> ldr = ctx.kernalContext().<K, V>dataLoad().dataLoader(ctx.namex(), false); try { ldr.updater(new GridDrDataLoadCacheUpdater<K, V>()); - LocalStoreLoadClosure c = new LocalStoreLoadClosure(null, ldr, 0); + LocalStoreLoadClosure c = new LocalStoreLoadClosure(null, ldr, plc0); ctx.store().localStoreLoadAll(null, keys, c); @@ -4013,7 +4037,18 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, ctx.store().loadAllFromStore(null, keys, new CI2<K, V>() { @Override public void apply(K key, V val) { - loadEntry(key, val, ver0, null, topVer, replicate, 0); + long ttl = 0; + + if (plc0 != null) { + ttl = CU.toTtl(plc0.getExpiryForCreation()); + + if (ttl == CU.TTL_ZERO) + return; + else if (ttl == CU.TTL_NOT_CHANGED) + ttl = 0; + } + + loadEntry(key, val, ver0, null, topVer, replicate, ttl); } }); } @@ -5923,10 +5958,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, /** * */ - protected static class GetExpiryPolicy implements IgniteCacheExpiryPolicy { - /** */ - private final long accessTtl; - + protected abstract static class GetExpiryPolicy implements IgniteCacheExpiryPolicy { /** */ private Map<Object, IgniteBiTuple<byte[], GridCacheVersion>> entries; @@ -5937,30 +5969,30 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, * @param expiryPlc Expiry policy. * @return Access expire policy. */ - public static GetExpiryPolicy forPolicy(@Nullable ExpiryPolicy expiryPlc) { + @Nullable public static GetExpiryPolicy forPolicy(@Nullable final ExpiryPolicy expiryPlc) { if (expiryPlc == null) return null; - Duration duration = expiryPlc.getExpiryForAccess(); - - if (duration == null) - return null; - - return new GetExpiryPolicy(CU.toTtl(duration)); + return new GetExpiryPolicy() { + @Override public long forAccess() { + return CU.toTtl(expiryPlc.getExpiryForAccess()); + } + }; } /** - * @param accessTtl TTL for access. + * @param ttl Access TTL. + * @return Access expire policy. */ - public GetExpiryPolicy(long accessTtl) { - assert accessTtl >= 0 : accessTtl; - - this.accessTtl = accessTtl; - } + @Nullable public static GetExpiryPolicy forTtl(final long ttl) { + if (ttl == CU.TTL_NOT_CHANGED) + return null; - /** {@inheritDoc} */ - @Override public long forAccess() { - return accessTtl; + return new GetExpiryPolicy() { + @Override public long forAccess() { + return ttl; + } + }; } /** {@inheritDoc} */ @@ -6052,6 +6084,9 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, /** Update flag. */ private boolean update; + /** */ + private ExpiryPolicy plc; + /** * Required by {@link Externalizable}. */ @@ -6063,12 +6098,16 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, * @param cacheName Cache name. * @param keys Keys. * @param update If {@code true} calls {@link #localLoadAndUpdate(Collection)} - * otherwise {@link #localLoad(Collection)}. + * otherwise {@link #localLoad(Collection, ExpiryPolicy)}. */ - LoadKeysCallable(String cacheName, Collection<? extends K> keys, boolean update) { + LoadKeysCallable(String cacheName, + Collection<? extends K> keys, + boolean update, + ExpiryPolicy plc) { this.cacheName = cacheName; this.keys = keys; this.update = update; + this.plc = plc; } /** {@inheritDoc} */ @@ -6083,7 +6122,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, if (update) cache.localLoadAndUpdate(keys); else - cache.localLoad(keys); + cache.localLoad(keys, plc); } finally { cache.context().gate().leave(); @@ -6099,6 +6138,8 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, U.writeCollection(out, keys); out.writeBoolean(update); + + out.writeObject(plc != null ? new IgniteExternalizableExpiryPolicy(plc) : null); } /** {@inheritDoc} */ @@ -6108,6 +6149,8 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, keys = U.readCollection(in); update = in.readBoolean(); + + plc = (ExpiryPolicy)in.readObject(); } } @@ -6125,17 +6168,19 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, final IgniteDataLoaderImpl<K, V> ldr; /** */ - final long ttl; + final ExpiryPolicy plc; /** * @param p Key/value predicate. * @param ldr Loader. - * @param ttl TTL. + * @param plc Optional expiry policy. */ - private LocalStoreLoadClosure(@Nullable IgniteBiPredicate<K, V> p, IgniteDataLoaderImpl<K, V> ldr, long ttl) { + private LocalStoreLoadClosure(@Nullable IgniteBiPredicate<K, V> p, + IgniteDataLoaderImpl<K, V> ldr, + @Nullable ExpiryPolicy plc) { this.p = p; this.ldr = ldr; - this.ttl = ttl; + this.plc = plc; col = new ArrayList<>(ldr.perNodeBufferSize()); } @@ -6147,12 +6192,29 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, if (p != null && !p.apply(key, val)) return; + long ttl = 0; + + if (plc != null) { + ttl = CU.toTtl(plc.getExpiryForCreation()); + + if (ttl == CU.TTL_ZERO) + return; + else if (ttl == CU.TTL_NOT_CHANGED) + ttl = 0; + } + if (ctx.portableEnabled()) { key = (K)ctx.marshalToPortable(key); val = (V)ctx.marshalToPortable(val); } - GridCacheRawVersionedEntry<K,V> e = new GridCacheRawVersionedEntry<>(key, null, val, null, ttl, 0, ver); + GridCacheRawVersionedEntry<K,V> e = new GridCacheRawVersionedEntry<>(key, + null, + val, + null, + ttl, + 0, + ver); e.marshal(ctx.marshaller()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/30a67831/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index ca7b740..64c83fd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -817,12 +817,14 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> if (ret != null && expiryPlc != null) { long ttl = expiryPlc.forAccess(); - updateTtl(ttl); + if (ttl != CU.TTL_NOT_CHANGED) { + updateTtl(ttl); - expiryPlc.ttlUpdated(key(), - getOrMarshalKeyBytes(), - version(), - hasReaders() ? ((GridDhtCacheEntry) this).readers() : null); + expiryPlc.ttlUpdated(key(), + getOrMarshalKeyBytes(), + version(), + hasReaders() ? ((GridDhtCacheEntry)this).readers() : null); + } } } @@ -1132,8 +1134,8 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> expireTime = toExpireTime(ttl); } - assert ttl >= 0; - assert expireTime >= 0; + assert ttl >= 0 : ttl; + assert expireTime >= 0 : expireTime; // Detach value before index update. if (cctx.portableEnabled()) @@ -1424,19 +1426,38 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> GridCacheValueBytes oldBytes = valueBytesUnlocked(); - if (needVal && old == null && (cctx.readThrough() && (op == GridCacheOperation.TRANSFORM || cctx.loadPreviousValue()))) { + boolean readThrough = false; + + if (needVal && old == null && + (cctx.readThrough() && (op == GridCacheOperation.TRANSFORM || cctx.loadPreviousValue()))) { old = readThrough(null, key, false, CU.<K, V>empty(), subjId, taskName); + long ttl = 0; + long expireTime = 0; + + if (expiryPlc != null && old != null) { + ttl = CU.toTtl(expiryPlc.getExpiryForCreation()); + + if (ttl == CU.TTL_ZERO) { + ttl = 1; + expireTime = U.currentTimeMillis() - 1; + } + else if (ttl == CU.TTL_NOT_CHANGED) + ttl = 0; + else + expireTime = CU.toExpireTime(ttl); + } + // Detach value before index update. if (cctx.portableEnabled()) old = (V)cctx.kernalContext().portable().detachPortable(old); if (old != null) - updateIndex(old, null, expireTime(), ver, null); + updateIndex(old, null, expireTime, ver, null); else clearIndex(null); - update(old, null, 0, 0, ver); + update(old, null, expireTime, ttl, ver); } // Apply metrics. @@ -1451,10 +1472,10 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> boolean pass = cctx.isAll(wrapFilterLocked(), filter); if (!pass) { - if (expiryPlc != null && hasValueUnlocked()) { + if (expiryPlc != null && !readThrough && filter != cctx.noPeekArray() && hasValueUnlocked()) { long ttl = CU.toTtl(expiryPlc.getExpiryForAccess()); - if (ttl != -1L) + if (ttl != CU.TTL_NOT_CHANGED) updateTtl(ttl); } @@ -1489,8 +1510,16 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> invokeRes = new CacheInvokeResult<>(e); } - if (!entry.modified()) + if (!entry.modified()) { + if (expiryPlc != null && !readThrough && hasValueUnlocked()) { + long newTtl = CU.toTtl(expiryPlc.getExpiryForAccess()); + + if (newTtl != CU.TTL_NOT_CHANGED) + updateTtl(newTtl); + } + return new GridTuple3<>(false, null, invokeRes); + } } else updated = (V)writeObj; @@ -1514,23 +1543,14 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> boolean hadVal = hasValueUnlocked(); - // Try write-through. - if (op == GridCacheOperation.UPDATE) { - // Detach value before index update. - if (cctx.portableEnabled()) - updated = (V)cctx.kernalContext().portable().detachPortable(updated); - - if (writeThrough) - // Must persist inside synchronization in non-tx mode. - cctx.store().putToStore(null, key, updated, ver); - - long ttl; - long expireTime; + long ttl = 0; + long expireTime = 0; + if (op == GridCacheOperation.UPDATE) { if (expiryPlc != null) { ttl = CU.toTtl(hadVal ? expiryPlc.getExpiryForUpdate() : expiryPlc.getExpiryForCreation()); - if (ttl == -1L) { + if (ttl == CU.TTL_NOT_CHANGED) { ttl = ttlExtras(); expireTime = expireTimeExtras(); @@ -1543,6 +1563,20 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> expireTime = expireTimeExtras(); } + } + + if (ttl == CU.TTL_ZERO) + op = GridCacheOperation.DELETE; + + // Try write-through. + if (op == GridCacheOperation.UPDATE) { + // Detach value before index update. + if (cctx.portableEnabled()) + updated = (V)cctx.kernalContext().portable().detachPortable(updated); + + if (writeThrough) + // Must persist inside synchronization in non-tx mode. + cctx.store().putToStore(null, key, updated, ver); // Update index inside synchronization since it can be updated // in load methods without actually holding entry lock. @@ -1674,9 +1708,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> if (isNew()) unswap(true, retval); - boolean newTtlResolved = false; - - boolean drNeedResolve = false; + boolean drNeedResolve; Object transformClo = null; @@ -1714,8 +1746,6 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> newExpireTime = CU.toExpireTime(newTtl); } - newTtlResolved = true; - GridCacheVersionedEntryEx<K, V> oldEntry = versionedEntry(); GridCacheVersionedEntryEx<K, V> newEntry = new GridCachePlainVersionedEntry<>(k, (V)writeObj, newTtl, newExpireTime, drVer); @@ -1831,19 +1861,39 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> GridCacheValueBytes oldBytes = valueBytesUnlocked(); + boolean readThrough = false; + if (needVal && old == null && (cctx.readThrough() && (op == GridCacheOperation.TRANSFORM || cctx.loadPreviousValue()))) { old = readThrough(null, key, false, CU.<K, V>empty(), subjId, taskName); + readThrough = true; + // Detach value before index update. if (cctx.portableEnabled()) old = (V)cctx.kernalContext().portable().detachPortable(old); + long ttl = 0; + long expireTime = 0; + + if (expiryPlc != null && old != null) { + ttl = expiryPlc.forCreate(); + + if (ttl == CU.TTL_ZERO) { + ttl = 1; + expireTime = U.currentTimeMillis() - 1; + } + else if (ttl == CU.TTL_NOT_CHANGED) + ttl = 0; + else + expireTime = CU.toExpireTime(ttl); + } + if (old != null) - updateIndex(old, null, expireTime(), ver, null); + updateIndex(old, null, expireTime, ver, null); else clearIndex(null); - update(old, null, 0, 0, ver); + update(old, null, expireTime, ttl, ver); if (deletedUnlocked() && old != null && !isInternal()) deletedUnlocked(false); @@ -1861,16 +1911,16 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> boolean pass = cctx.isAll(wrapFilterLocked(), filter); if (!pass) { - if (hasValueUnlocked() && expiryPlc != null) { + if (expiryPlc != null && !readThrough && hasValueUnlocked() && filter != cctx.noPeekArray()) { newTtl = expiryPlc.forAccess(); - if (newTtl != -1L) { + if (newTtl != CU.TTL_NOT_CHANGED) { updateTtl(newTtl); expiryPlc.ttlUpdated(key, getOrMarshalKeyBytes(), version(), - hasReaders() ? ((GridDhtCacheEntry) this).readers() : null); + hasReaders() ? ((GridDhtCacheEntry)this).readers() : null); } } @@ -1913,6 +1963,19 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> } if (!entry.modified()) { + if (expiryPlc != null && !readThrough && hasValueUnlocked()) { + newTtl = expiryPlc.forAccess(); + + if (newTtl != CU.TTL_NOT_CHANGED) { + updateTtl(newTtl); + + expiryPlc.ttlUpdated(key, + getOrMarshalKeyBytes(), + version(), + hasReaders() ? ((GridDhtCacheEntry)this).readers() : null); + } + } + return new GridCacheUpdateAtomicResult<>(false, retval ? old : null, null, @@ -1944,28 +2007,9 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> IgniteBiTuple<Boolean, V> interceptRes = null; - if (op == GridCacheOperation.UPDATE) { - if (intercept) { - V interceptorVal = (V)cctx.config().getInterceptor().onBeforePut(key, old, updated); - - if (interceptorVal == null) - return new GridCacheUpdateAtomicResult<>(false, - retval ? old : null, - null, - invokeRes, - -1L, - -1L, - null, - null, - false); - else if (interceptorVal != updated) { - updated = cctx.unwrapTemporary(interceptorVal); - valBytes = null; - } - } - - long ttl0 = newTtl; + long ttl0 = newTtl; + if (op == GridCacheOperation.UPDATE) { if (drRes == null) { // Calculate TTL and expire time for local update. if (drTtl >= 0L) { @@ -1975,14 +2019,14 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> newExpireTime = drExpireTime; } else { - assert drExpireTime == -1L : drExpireTime; + assert drExpireTime == CU.TTL_NOT_CHANGED : drExpireTime; if (expiryPlc != null) newTtl = hadVal ? expiryPlc.forUpdate() : expiryPlc.forCreate(); else - newTtl = -1L; + newTtl = CU.TTL_NOT_CHANGED; - if (newTtl == -1L) { + if (newTtl == CU.TTL_NOT_CHANGED) { ttl0 = ttlExtras(); newExpireTime = expireTimeExtras(); } @@ -1992,8 +2036,35 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> } } } - else if (newTtl == -1L) + else if (newTtl == CU.TTL_NOT_CHANGED) ttl0 = ttlExtras(); + } + + if (ttl0 == CU.TTL_ZERO) { + op = GridCacheOperation.DELETE; + + updated = null; + } + + if (op == GridCacheOperation.UPDATE) { + if (intercept) { + V interceptorVal = (V)cctx.config().getInterceptor().onBeforePut(key, old, updated); + + if (interceptorVal == null) + return new GridCacheUpdateAtomicResult<>(false, + retval ? old : null, + null, + invokeRes, + -1L, + -1L, + null, + null, + false); + else if (interceptorVal != updated) { + updated = cctx.unwrapTemporary(interceptorVal); + valBytes = null; + } + } // Try write-through. if (writeThrough) @@ -2567,10 +2638,17 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> * @param ttl Time to live. */ private void updateTtl(long ttl) { - assert ttl >= 0 : ttl; + assert ttl >= 0 || ttl == CU.TTL_ZERO : ttl; assert Thread.holdsLock(this); - long expireTime = toExpireTime(ttl); + long expireTime; + + if (ttl == CU.TTL_ZERO) { + ttl = 1; + expireTime = U.currentTimeMillis() - 1; + } + else + expireTime = toExpireTime(ttl); long oldExpireTime = expireTimeExtras(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/30a67831/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java index adf5c07..531ea3c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java @@ -72,6 +72,12 @@ public class GridCacheUtils { /** Peek flags. */ private static final GridCachePeekMode[] PEEK_FLAGS = new GridCachePeekMode[] { GLOBAL, SWAP }; + /** */ + public static final long TTL_NOT_CHANGED = -1L; + + /** */ + public static final long TTL_ZERO = -2L; + /** Per-thread generated UID store. */ private static final ThreadLocal<String> UUIDS = new ThreadLocal<String>() { @Override protected String initialValue() { @@ -1678,7 +1684,7 @@ public class GridCacheUtils { */ public static long toTtl(Duration duration) { if (duration == null) - return -1; + return TTL_NOT_CHANGED; if (duration.getDurationAmount() == 0) { if (duration.isEternal()) @@ -1686,10 +1692,10 @@ public class GridCacheUtils { assert duration.isZero(); - return 1L; + return TTL_ZERO; } - assert duration.getTimeUnit() != null; + assert duration.getTimeUnit() != null : duration; return duration.getTimeUnit().toMillis(duration.getDurationAmount()); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/30a67831/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTtlUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTtlUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTtlUpdateRequest.java index 00aae95..a36ceab 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTtlUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTtlUpdateRequest.java @@ -79,7 +79,7 @@ public class GridCacheTtlUpdateRequest<K, V> extends GridCacheMessage<K, V> { * @param ttl TTL. */ public GridCacheTtlUpdateRequest(long topVer, long ttl) { - assert ttl >= 0 : ttl; + assert ttl >= 0 || ttl == CU.TTL_ZERO : ttl; this.topVer = topVer; this.ttl = ttl; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/30a67831/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java index be65c00..36914b5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java @@ -505,6 +505,9 @@ public class GridDistributedTxRemoteAdapter<K, V> extends IgniteTxAdapter<K, V> GridCacheVersion explicitVer = txEntry.drVersion(); + if (txEntry.ttl() == CU.TTL_ZERO) + op = DELETE; + if (finalizationStatus() == FinalizationStatus.RECOVERY_FINISH || optimistic()) { // Primary node has left the grid so we have to process conflicts on backups. if (explicitVer == null) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/30a67831/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/IgniteExternalizableExpiryPolicy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/IgniteExternalizableExpiryPolicy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/IgniteExternalizableExpiryPolicy.java index afaac97..f4caaca 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/IgniteExternalizableExpiryPolicy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/IgniteExternalizableExpiryPolicy.java @@ -100,10 +100,12 @@ public class IgniteExternalizableExpiryPolicy implements ExpiryPolicy, Externali */ private void writeDuration(ObjectOutput out, @Nullable Duration duration) throws IOException { if (duration != null) { - if (duration.isEternal()) - out.writeLong(0); - else if (duration.getDurationAmount() == 0) - out.writeLong(1); + if (duration.getDurationAmount() == 0L) { + if (duration.isEternal()) + out.writeLong(0); + else + out.writeLong(CU.TTL_ZERO); + } else out.writeLong(duration.getTimeUnit().toMillis(duration.getDurationAmount())); } @@ -117,10 +119,12 @@ public class IgniteExternalizableExpiryPolicy implements ExpiryPolicy, Externali private Duration readDuration(ObjectInput in) throws IOException { long ttl = in.readLong(); - assert ttl >= 0; + assert ttl >= 0 || ttl == CU.TTL_ZERO : ttl; if (ttl == 0) return Duration.ETERNAL; + else if (ttl == CU.TTL_ZERO) + return Duration.ZERO; return new Duration(TimeUnit.MILLISECONDS, ttl); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/30a67831/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java index f4d2d0e..0074092 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java @@ -36,6 +36,7 @@ import org.jdk8.backport.*; import org.jetbrains.annotations.*; import javax.cache.*; +import javax.cache.expiry.*; import java.io.*; import java.util.*; import java.util.concurrent.*; @@ -358,9 +359,10 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap } /** {@inheritDoc} */ - @Override public void localLoad(Collection<? extends K> keys) throws IgniteCheckedException { + @Override public void localLoad(Collection<? extends K> keys, final ExpiryPolicy plc) + throws IgniteCheckedException { if (ctx.store().isLocalStore()) { - super.localLoad(keys); + super.localLoad(keys, plc); return; } @@ -372,9 +374,11 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap final long topVer = ctx.affinity().affinityTopologyVersion(); + final ExpiryPolicy plc0 = plc != null ? plc : ctx.expiry(); + ctx.store().loadAllFromStore(null, keys, new CI2<K, V>() { @Override public void apply(K key, V val) { - loadEntry(key, val, ver0, null, topVer, replicate, 0); + loadEntry(key, val, ver0, null, topVer, replicate, plc0); } }); } @@ -394,11 +398,13 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap final long topVer = ctx.affinity().affinityTopologyVersion(); + final ExpiryPolicy plc = ctx.expiry(); + ctx.store().loadCache(new CI3<K, V, GridCacheVersion>() { @Override public void apply(K key, V val, @Nullable GridCacheVersion ver) { assert ver == null; - loadEntry(key, val, ver0, p, topVer, replicate, ttl); + loadEntry(key, val, ver0, p, topVer, replicate, plc); } }, args); } @@ -410,7 +416,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap * @param p Optional predicate. * @param topVer Topology version. * @param replicate Replication flag. - * @param ttl TTL. + * @param plc Expiry policy. */ private void loadEntry(K key, V val, @@ -418,7 +424,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap @Nullable IgniteBiPredicate<K, V> p, long topVer, boolean replicate, - long ttl) { + @Nullable ExpiryPolicy plc) { if (p != null && !p.apply(key, val)) return; @@ -430,6 +436,17 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap GridCacheEntryEx<K, V> entry = null; try { + long ttl = 0; + + if (plc != null) { + ttl = CU.toTtl(plc.getExpiryForCreation()); + + if (ttl == CU.TTL_ZERO) + return; + else if (ttl == CU.TTL_NOT_CHANGED) + ttl = 0; + } + if (ctx.portableEnabled()) { key = (K)ctx.marshalToPortable(key); val = (V)ctx.marshalToPortable(val); @@ -604,7 +621,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap long ttl = req.accessTtl(); - final GetExpiryPolicy expiryPlc = ttl == -1L ? null : new GetExpiryPolicy(ttl); + final GetExpiryPolicy expiryPlc = GetExpiryPolicy.forTtl(ttl); IgniteInternalFuture<Collection<GridCacheEntryInfo<K, V>>> fut = getDhtAsync(nodeId, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/30a67831/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index cf67c15..61e14a7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -1285,7 +1285,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (expiry != null && entry.hasValue()) { long ttl = expiry.forAccess(); - if (ttl != -1L) { + if (ttl != CU.TTL_NOT_CHANGED) { entry.updateTtl(null, ttl); expiry.ttlUpdated(entry.key(), http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/30a67831/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java index 74d0f1e..b426d73 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java @@ -316,8 +316,8 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda } /** {@inheritDoc} */ - @Override public void localLoad(Collection<? extends K> keys) throws IgniteCheckedException { - dht().localLoad(keys); + @Override public void localLoad(Collection<? extends K> keys, ExpiryPolicy plc) throws IgniteCheckedException { + dht().localLoad(keys, plc); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/30a67831/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index eb5e662..069bcdb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -34,6 +34,7 @@ import org.apache.ignite.lang.*; import org.apache.ignite.transactions.*; import org.jetbrains.annotations.*; +import javax.cache.expiry.*; import javax.cache.processor.*; import java.io.*; import java.util.*; @@ -1222,6 +1223,22 @@ public abstract class IgniteTxAdapter<K, V> extends GridMetadataAwareAdapter GridCacheOperation op = modified ? (val == null ? DELETE : UPDATE) : NOOP; + if (op == NOOP) { + ExpiryPolicy expiry = txEntry.expiry(); + + if (expiry == null) + expiry = cacheCtx.expiry(); + + if (expiry != null) { + long ttl = CU.toTtl(expiry.getExpiryForAccess()); + + txEntry.ttl(ttl); + + if (ttl == CU.TTL_ZERO) + op = DELETE; + } + } + return F.t(op, (V)cacheCtx.<V>unwrapTemporary(val), null); } catch (GridCacheFilterFailedException e) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/30a67831/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index f985753..5e9fbf2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -730,7 +730,12 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> Duration duration = cached.hasValue() ? expiry.getExpiryForUpdate() : expiry.getExpiryForCreation(); - txEntry.ttl(CU.toTtl(duration)); + long ttl = CU.toTtl(duration); + + txEntry.ttl(ttl); + + if (ttl == CU.TTL_ZERO) + op = DELETE; } } @@ -880,7 +885,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> "Transaction does not own lock for group lock entry during commit [tx=" + this + ", txEntry=" + txEntry + ']'; - if (txEntry.ttl() != -1L) + if (txEntry.ttl() != CU.TTL_NOT_CHANGED) cached.updateTtl(null, txEntry.ttl()); if (log.isDebugEnabled()) @@ -1596,7 +1601,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> if (expiryPlc == null) expiryPlc = cacheCtx.expiry(); - long accessTtl = expiryPlc != null ? CU.toTtl(expiryPlc.getExpiryForAccess()) : -1L; + long accessTtl = expiryPlc != null ? CU.toTtl(expiryPlc.getExpiryForAccess()) : CU.TTL_NOT_CHANGED; IgniteInternalFuture<Boolean> fut = cacheCtx.cache().txLockAsync(lockKeys, lockTimeout(), @@ -2402,7 +2407,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> txEntry.filters(CU.<K, V>empty()); txEntry.filtersSet(false); - updateTtl = true; + updateTtl = filter != cacheCtx.noPeekArray(); } if (updateTtl) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/30a67831/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedClassDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedClassDescriptor.java b/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedClassDescriptor.java index 0f746f8..cbbba5c 100644 --- a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedClassDescriptor.java +++ b/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedClassDescriptor.java @@ -814,7 +814,7 @@ class OptimizedClassDescriptor { case TYPE_SERIALIZABLE: if (out.requireSerializable() && !isSerial) throw new NotSerializableException("Must implement java.io.Serializable or " + - "set GridOptimizedMarshaller.setRequireSerializable() to false " + + "set OptimizedMarshaller.setRequireSerializable() to false " + "(note that performance may degrade if object is not Serializable): " + name); out.writeSerializable(obj, writeObjMtds, fields); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/30a67831/modules/core/src/test/java/org/apache/ignite/internal/IgniteComputeTopologyExceptionTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteComputeTopologyExceptionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteComputeTopologyExceptionTest.java new file mode 100644 index 0000000..2db8062 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteComputeTopologyExceptionTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.cluster.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.marshaller.optimized.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.*; + +import static org.apache.ignite.internal.GridClosureCallMode.*; + +/** + * + */ +public class IgniteComputeTopologyExceptionTest extends GridCommonAbstractTest { + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + startGrids(2); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setMarshaller(new OptimizedMarshaller(false)); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testCorrectException() throws Exception { + Ignite ignite = ignite(0); + + IgniteCompute comp = ignite.compute(ignite.cluster().forRemotes()).withNoFailover(); + + stopGrid(1); + + try { + comp.call(new IgniteCallable<Object>() { + @Override public Object call() throws Exception { + fail("Should not be called."); + + return null; + } + }); + + fail(); + } + catch (ClusterTopologyException e) { + log.info("Expected exception: " + e); + } + } + + /** + * @throws Exception If failed. + */ + public void testCorrectCheckedException() throws Exception { + IgniteKernal ignite0 = (IgniteKernal)ignite(0); + + Collection<ClusterNode> nodes = F.asList(ignite(1).cluster().localNode()); + + stopGrid(1); + + IgniteInternalFuture<?> fut = ignite0.context().closure().callAsyncNoFailover(BALANCE, + new IgniteCallable<Object>() { + @Override public Object call() throws Exception { + fail("Should not be called."); + + return null; + } + }, + nodes, + false); + + try { + fut.get(); + + fail(); + } + catch (ClusterTopologyCheckedException e) { + log.info("Expected exception: " + e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/30a67831/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicExpiryPolicyWithStoreTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicExpiryPolicyWithStoreTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicExpiryPolicyWithStoreTest.java new file mode 100644 index 0000000..a638ed2 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicExpiryPolicyWithStoreTest.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.expiry; + +import org.apache.ignite.cache.*; + +import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.*; +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheMode.*; + +/** + * + */ +public class IgniteCacheAtomicExpiryPolicyWithStoreTest extends IgniteCacheExpiryPolicyWithStoreAbstractTest { + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 3; + } + + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return PARTITIONED; + } + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return ATOMIC; + } + + /** {@inheritDoc} */ + @Override protected CacheAtomicWriteOrderMode atomicWriteOrderMode() { + return PRIMARY; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/30a67831/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java index 0990929..59e8bc4 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java @@ -32,6 +32,7 @@ import org.jetbrains.annotations.*; import javax.cache.configuration.*; import javax.cache.expiry.*; +import javax.cache.processor.*; import java.util.*; import java.util.concurrent.*; @@ -73,6 +74,125 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs /** * @throws Exception If failed. */ + public void testZeroOnCreate() throws Exception { + factory = CreatedExpiryPolicy.factoryOf(Duration.ZERO); + + startGrids(); + + for (final Integer key : keys()) { + log.info("Test zero duration on create, key: " + key); + + zeroOnCreate(key); + } + } + + /** + * @param key Key. + * @throws Exception If failed. + */ + private void zeroOnCreate(Integer key) throws Exception { + IgniteCache<Integer, Integer> cache = jcache(); + + cache.put(key, 1); // Create with zero duration, should not create cache entry. + + checkNoValue(F.asList(key)); + } + + /** + * @throws Exception If failed. + */ + public void testZeroOnUpdate() throws Exception { + factory = new Factory<ExpiryPolicy>() { + @Override public ExpiryPolicy create() { + return new ExpiryPolicy() { + @Override public Duration getExpiryForCreation() { + return null; + } + + @Override public Duration getExpiryForAccess() { + return null; + } + + @Override public Duration getExpiryForUpdate() { + return Duration.ZERO; + } + }; + } + }; + + startGrids(); + + for (final Integer key : keys()) { + log.info("Test zero duration on update, key: " + key); + + zeroOnUpdate(key); + } + } + + /** + * @param key Key. + * @throws Exception If failed. + */ + private void zeroOnUpdate(Integer key) throws Exception { + IgniteCache<Integer, Integer> cache = jcache(); + + cache.put(key, 1); // Create. + + assertEquals((Integer)1, cache.get(key)); + + cache.put(key, 2); // Update should expire entry. + + checkNoValue(F.asList(key)); + } + + /** + * @throws Exception If failed. + */ + public void testZeroOnAccess() throws Exception { + factory = new Factory<ExpiryPolicy>() { + @Override public ExpiryPolicy create() { + return new ExpiryPolicy() { + @Override public Duration getExpiryForCreation() { + return null; + } + + @Override public Duration getExpiryForAccess() { + return Duration.ZERO; + } + + @Override public Duration getExpiryForUpdate() { + return null; + } + }; + } + }; + + startGrids(); + + for (final Integer key : keys()) { + log.info("Test zero duration on access, key: " + key); + + zeroOnAccess(key); + } + } + + /** + * @param key Key. + * @throws Exception If failed. + */ + private void zeroOnAccess(Integer key) throws Exception { + IgniteCache<Integer, Integer> cache = jcache(); + + cache.put(key, 1); // Create. + + assertEquals((Integer)1, cache.get(key)); // Access should expire entry. + + waitExpired(F.asList(key)); + } + + /** + * @throws Exception If failed. + */ public void testEternal() throws Exception { factory = EternalExpiryPolicy.factoryOf(); @@ -266,11 +386,23 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs checkTtl(key, 62_000L, true); - assertEquals((Integer)1, cache.withExpiryPolicy(new TestPolicy(1100L, 1200L, TTL_FOR_EXPIRE)).get(key)); + IgniteCache<Integer, Integer> cache0 = cache.withExpiryPolicy(new TestPolicy(1100L, 1200L, TTL_FOR_EXPIRE)); + + assertEquals((Integer)1, cache0.get(key)); checkTtl(key, TTL_FOR_EXPIRE, true); waitExpired(key); + + cache.put(key, 1); + + checkTtl(key, 60_000L); + + Integer res = cache.invoke(key, new GetEntryProcessor()); + + assertEquals((Integer)1, res); + + checkTtl(key, 62_000L, true); } /** @@ -810,6 +942,14 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs } }, 3000); + checkNoValue(keys); + } + + /** + * @param keys Keys. + * @throws Exception If failed. + */ + private void checkNoValue(Collection<Integer> keys) throws Exception { IgniteCache<Integer, Object> cache = jcache(0); for (int i = 0; i < gridCount(); i++) { @@ -886,7 +1026,7 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs }, 3000); } - assertEquals("Unexpected ttl [grid=" + i + ", key=" + key +']', ttl, e.ttl()); + assertEquals("Unexpected ttl [node=" + i + ", key=" + key +']', ttl, e.ttl()); if (ttl > 0) assertTrue(e.expireTime() > 0); @@ -913,6 +1053,16 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs /** * */ + private static class GetEntryProcessor implements EntryProcessor<Integer, Integer, Integer> { + /** {@inheritDoc} */ + @Override public Integer process(MutableEntry<Integer, Integer> e, Object... args) { + return e.getValue(); + } + } + + /** + * + */ private class TestPolicy implements ExpiryPolicy { /** */ private Long create; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/30a67831/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java index 775511e..233ae2b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java @@ -42,6 +42,9 @@ public class IgniteCacheExpiryPolicyTestSuite extends TestSuite { suite.addTestSuite(IgniteCacheTxWithStoreExpiryPolicyTest.class); suite.addTestSuite(IgniteCacheTxReplicatedExpiryPolicyTest.class); + suite.addTestSuite(IgniteCacheAtomicExpiryPolicyWithStoreTest.class); + suite.addTestSuite(IgniteCacheTxExpiryPolicyWithStoreTest.class); + return suite; } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/30a67831/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyWithStoreAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyWithStoreAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyWithStoreAbstractTest.java new file mode 100644 index 0000000..c09e3db --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyWithStoreAbstractTest.java @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.expiry; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.store.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import javax.cache.configuration.*; +import javax.cache.expiry.*; +import javax.cache.integration.*; +import javax.cache.processor.*; + +import java.util.concurrent.*; + +import static org.apache.ignite.cache.CacheDistributionMode.*; + +/** + * + */ +public abstract class IgniteCacheExpiryPolicyWithStoreAbstractTest extends IgniteCacheAbstractTest { + /** {@inheritDoc} */ + @Override protected CacheDistributionMode distributionMode() { + return PARTITIONED_ONLY; + } + + /** {@inheritDoc} */ + @Override protected CacheStore<?, ?> cacheStore() { + return new TestStore(); + } + + /** {@inheritDoc} */ + @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception { + CacheConfiguration ccfg = super.cacheConfiguration(gridName); + + ccfg.setExpiryPolicyFactory(new Factory<ExpiryPolicy>() { + @Override public ExpiryPolicy create() { + return new ExpiryPolicy() { + @Override public Duration getExpiryForCreation() { + return new Duration(TimeUnit.MILLISECONDS, 500); + } + + @Override public Duration getExpiryForAccess() { + return new Duration(TimeUnit.MILLISECONDS, 600); + } + + @Override public Duration getExpiryForUpdate() { + return new Duration(TimeUnit.MILLISECONDS, 700); + } + }; + } + }); + + return ccfg; + } + + /** + * @throws Exception If failed. + */ + public void testLoadAll() throws Exception { + IgniteCache<Integer, Integer> cache = jcache(0); + + final Integer key = primaryKey(cache); + + storeMap.put(key, 100); + + try { + CompletionListenerFuture fut = new CompletionListenerFuture(); + + cache.loadAll(F.asSet(key), false, fut); + + fut.get(); + + checkTtl(key, 500, false); + + assertEquals((Integer)100, cache.localPeek(key, CachePeekMode.ONHEAP)); + + U.sleep(600); + + checkExpired(key); + + cache = cache.withExpiryPolicy(new ExpiryPolicy() { + @Override public Duration getExpiryForCreation() { + return new Duration(TimeUnit.MILLISECONDS, 501); + } + + @Override public Duration getExpiryForAccess() { + return new Duration(TimeUnit.MILLISECONDS, 601); + } + + @Override public Duration getExpiryForUpdate() { + return new Duration(TimeUnit.MILLISECONDS, 701); + } + }); + + fut = new CompletionListenerFuture(); + + cache.loadAll(F.asSet(key), false, fut); + + fut.get(); + + checkTtl(key, 501, false); + } + finally { + cache.removeAll(); + } + } + + /** + * @throws Exception If failed. + */ + public void testLoadCache() throws Exception { + IgniteCache<Integer, Integer> cache = jcache(0); + + final Integer key = primaryKey(cache); + + storeMap.put(key, 100); + + try { + cache.loadCache(null); + + checkTtl(key, 500, false); + + assertEquals((Integer)100, cache.localPeek(key, CachePeekMode.ONHEAP)); + + U.sleep(600); + + checkExpired(key); + } + finally { + cache.removeAll(); + } + } + + /** + * @throws Exception If failed. + */ + public void _testReadThrough() throws Exception { + IgniteCache<Integer, Integer> cache = jcache(0); + + final Integer key = primaryKeys(cache, 1, 100_000).get(0); + + storeMap.put(key, 100); + + try { + Integer res = cache.invoke(key, new EntryProcessor<Integer, Integer, Integer>() { + @Override public Integer process(MutableEntry<Integer, Integer> e, Object... args) { + return e.getValue(); + } + }); + + assertEquals((Integer)100, res); + + checkTtl(key, 500, true); + + assertEquals((Integer)100, cache.localPeek(key, CachePeekMode.ONHEAP)); + + U.sleep(600); + + checkExpired(key); + } + finally { + cache.removeAll(); + } + } + + /** + * @param key Key. + */ + private void checkExpired(Integer key) { + for (int i = 0; i < gridCount(); i++) { + IgniteCache<Integer, Integer> cache = jcache(i); + + assertNull(cache.localPeek(key, CachePeekMode.ONHEAP)); + } + } + + /** + * @param key Key. + * @param ttl TTL. + * @throws Exception If failed. + */ + private void checkTtl(Object key, final long ttl, boolean primaryOnly) throws Exception { + boolean found = false; + + for (int i = 0; i < gridCount(); i++) { + IgniteKernal grid = (IgniteKernal)grid(i); + + GridCacheAdapter<Object, Object> cache = grid.context().cache().internalCache(); + + GridCacheEntryEx<Object, Object> e = cache.peekEx(key); + + if (e == null && cache.context().isNear()) + e = cache.context().near().dht().peekEx(key); + + if (e == null) { + if (primaryOnly) + assertTrue("Not found " + key, !cache.affinity().isPrimary(grid.localNode(), key)); + else + assertTrue("Not found " + key, !cache.affinity().isPrimaryOrBackup(grid.localNode(), key)); + } + else { + found = true; + + assertEquals("Unexpected ttl [grid=" + i + ", key=" + key +']', ttl, e.ttl()); + + if (ttl > 0) + assertTrue(e.expireTime() > 0); + else + assertEquals(0, e.expireTime()); + } + } + + assertTrue(found); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/30a67831/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheTxExpiryPolicyWithStoreTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheTxExpiryPolicyWithStoreTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheTxExpiryPolicyWithStoreTest.java new file mode 100644 index 0000000..a1354a7 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheTxExpiryPolicyWithStoreTest.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.expiry; + +import org.apache.ignite.cache.*; + +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheMode.*; + +/** + * + */ +public class IgniteCacheTxExpiryPolicyWithStoreTest extends IgniteCacheExpiryPolicyWithStoreAbstractTest { + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 3; + } + + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return PARTITIONED; + } + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return TRANSACTIONAL; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/30a67831/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java index 906782e..82fc5e0 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java @@ -92,6 +92,7 @@ public class IgniteComputeGridTestSuite { suite.addTestSuite(GridMultinodeRedeployPrivateModeSelfTest.class); suite.addTestSuite(GridMultinodeRedeployIsolatedModeSelfTest.class); suite.addTestSuite(IgniteComputeEmptyClusterGroupTest.class); + suite.addTestSuite(IgniteComputeTopologyExceptionTest.class); return suite; }