# ignite-41
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/d84da7d0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/d84da7d0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/d84da7d0 Branch: refs/heads/ignite-41 Commit: d84da7d0846036eddf8f300c56735f7c1ab1e948 Parents: 688a2e7 Author: sboikov <sboi...@gridgain.com> Authored: Wed Dec 17 12:16:32 2014 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Wed Dec 17 13:52:23 2014 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheContext.java | 2 +- .../processors/cache/GridCacheEntryEx.java | 4 +- .../processors/cache/GridCacheIoManager.java | 2 +- .../processors/cache/GridCacheMapEntry.java | 40 +- .../processors/cache/GridCacheMessage.java | 9 + .../processors/cache/GridCacheTxEntry.java | 56 +- .../cache/GridCacheTxLocalAdapter.java | 47 +- .../dht/GridDhtTransactionalCacheAdapter.java | 12 +- .../distributed/dht/GridDhtTxLocalAdapter.java | 3 +- .../distributed/near/GridNearLockRequest.java | 8 +- .../local/atomic/GridLocalAtomicCache.java | 65 +- .../cache/IgniteCacheAbstractTest.java | 152 ++++ .../processors/cache/IgniteCacheTest.java | 124 ---- .../IgniteCacheAtomicExpiryPolicyTest.java | 41 ++ .../IgniteCacheAtomicLocalExpiryPolicyTest.java | 41 ++ ...teCacheAtomicReplicatedExpiryPolicyTest.java | 24 + .../IgniteCacheExpiryPolicyAbstractTest.java | 714 +++++++++++++++++++ .../expiry/IgniteCacheExpiryPolicyTest.java | 507 ------------- .../IgniteCacheExpiryPolicyTestSuite.java | 35 + .../expiry/IgniteCacheTxExpiryPolicyTest.java | 41 ++ .../IgniteCacheTxLocalExpiryPolicyTest.java | 41 ++ ...IgniteCacheTxReplicatedExpiryPolicyTest.java | 26 + .../processors/cache/GridCacheTestEntryEx.java | 15 +- .../testframework/junits/GridAbstractTest.java | 8 + .../junits/common/GridCommonAbstractTest.java | 25 +- .../bamboo/GridDataGridTestSuite.java | 2 +- 26 files changed, 1343 insertions(+), 701 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d84da7d0/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java index 58fd1cd..931e243 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java @@ -283,7 +283,7 @@ public class GridCacheContext<K, V> implements Externalizable { Factory<ExpiryPolicy> factory = cacheCfg.getExpiryPolicyFactory(); - expiryPlc = factory.create(); + expiryPlc = factory != null ? factory.create() : null; if (expiryPlc instanceof EternalExpiryPolicy) expiryPlc = null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d84da7d0/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java index fc8aaaa..76d73b3 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java @@ -447,7 +447,7 @@ public interface GridCacheEntryEx<K, V> extends GridMetadataAware { * @param writeObj Value. Type depends on operation. * @param writeThrough Write through flag. * @param retval Return value flag. - * @param ttl Time to live. + * @param expiryPlc Expiry policy.. * @param evt Event flag. * @param metrics Metrics update flag. * @param filter Optional filter to check. @@ -464,7 +464,7 @@ public interface GridCacheEntryEx<K, V> extends GridMetadataAware { @Nullable Object writeObj, boolean writeThrough, boolean retval, - long ttl, + @Nullable ExpiryPolicy expiryPlc, boolean evt, boolean metrics, @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d84da7d0/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheIoManager.java index a50e461..7320595 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheIoManager.java @@ -200,7 +200,7 @@ public class GridCacheIoManager<K, V> extends GridCacheSharedManagerAdapter<K, V unmarshall(nodeId, cacheMsg); - log.info("Message: " + cacheMsg); + //log.info("Message: " + cacheMsg); if (cacheMsg.allowForStartup()) processMessage(nodeId, cacheMsg, c); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d84da7d0/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java index 2c377e7..879f796 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java @@ -1112,7 +1112,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> @Nullable UUID subjId, String taskName ) throws IgniteCheckedException, GridCacheEntryRemovedException { - log.info("Inner set " + key + " " + val + " " + ttl); + // log.info("Inner set " + key + " " + val + " " + ttl); V old; @@ -1432,7 +1432,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> @Nullable Object writeObj, boolean writeThrough, boolean retval, - long ttl, + @Nullable ExpiryPolicy expiryPlc, boolean evt, boolean metrics, @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter, @@ -1456,13 +1456,6 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> if (isNew()) unswap(true, retval); - long newTtl = ttl; - - if (newTtl < 0) - newTtl = ttlExtras(); - - long newExpireTime = toExpireTime(newTtl); - // Possibly get old value form store. old = needVal ? rawGetOrUnmarshalUnlocked(!retval) : val; @@ -1541,11 +1534,36 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> // Must persist inside synchronization in non-tx mode. cctx.store().putToStore(null, key, updated, ver); + long ttl; + long expireTime; + + if (expiryPlc != null) { + if (!hadVal) { + Duration duration = expiryPlc.getExpiryForCreation(); + + if (duration != null && duration.isZero()) + return new IgniteBiTuple<>(false, cctx.<V>unwrapTemporary(old)); + + ttl = toTtl(duration); + } + else + ttl = toTtl(expiryPlc.getExpiryForUpdate()); + + ttl = ttl < 0 ? ttlExtras() : ttl; + + expireTime = toExpireTime(ttl); + } + else { + ttl = ttlExtras(); + + expireTime = toExpireTime(ttl); + } + // Update index inside synchronization since it can be updated // in load methods without actually holding entry lock. - updateIndex(updated, null, newExpireTime, ver, old); + updateIndex(updated, null, expireTime, ver, old); - update(updated, null, newExpireTime, newTtl, ver); + update(updated, null, expireTime, ttl, ver); if (evt) { V evtOld = null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d84da7d0/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMessage.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMessage.java index 71eac41..882fc5a 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMessage.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMessage.java @@ -330,7 +330,12 @@ public abstract class GridCacheMessage<K, V> extends GridTcpCommunicationMessage assert ctx != null; if (txEntries != null) { + boolean transferExpiry = transferExpiryPolicy(); + for (GridCacheTxEntry<K, V> e : txEntries) { + if (transferExpiry) + e.transferExpiryPolicyIfNeeded(); + e.marshal(ctx); if (ctx.deploymentEnabled()) { @@ -342,6 +347,10 @@ public abstract class GridCacheMessage<K, V> extends GridTcpCommunicationMessage } } + protected boolean transferExpiryPolicy() { + return false; + } + /** * @param txEntries Entries to unmarshal. * @param ctx Context. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d84da7d0/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxEntry.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxEntry.java index 91b9cc0..df888e7 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxEntry.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxEntry.java @@ -13,12 +13,14 @@ import org.apache.ignite.*; import org.apache.ignite.lang.*; import org.apache.ignite.marshaller.optimized.*; import org.gridgain.grid.cache.*; +import org.gridgain.grid.kernal.processors.cache.distributed.*; import org.gridgain.grid.util.lang.*; import org.gridgain.grid.util.tostring.*; import org.gridgain.grid.util.typedef.*; import org.gridgain.grid.util.typedef.internal.*; import org.jetbrains.annotations.*; +import javax.cache.expiry.*; import java.io.*; import java.util.*; import java.util.concurrent.atomic.*; @@ -128,6 +130,12 @@ public class GridCacheTxEntry<K, V> implements GridPeerDeployAware, Externalizab /** Data center replication version. */ private GridCacheVersion drVer; + /** Expiry policy. */ + private ExpiryPolicy expiryPlc; + + /** */ + private boolean transferExpiryPlc; + /** * Required by {@link Externalizable} */ @@ -147,8 +155,14 @@ public class GridCacheTxEntry<K, V> implements GridPeerDeployAware, Externalizab * @param entry Cache entry. * @param drVer Data center replication version. */ - public GridCacheTxEntry(GridCacheContext<K, V> ctx, GridCacheTxEx<K, V> tx, GridCacheOperation op, V val, - long ttl, long drExpireTime, GridCacheEntryEx<K, V> entry, @Nullable GridCacheVersion drVer) { + public GridCacheTxEntry(GridCacheContext<K, V> ctx, + GridCacheTxEx<K, V> tx, + GridCacheOperation op, + V val, + long ttl, + long drExpireTime, + GridCacheEntryEx<K, V> entry, + @Nullable GridCacheVersion drVer) { assert ctx != null; assert tx != null; assert op != null; @@ -183,9 +197,15 @@ public class GridCacheTxEntry<K, V> implements GridPeerDeployAware, Externalizab * @param filters Put filters. * @param drVer Data center replication version. */ - public GridCacheTxEntry(GridCacheContext<K, V> ctx, GridCacheTxEx<K, V> tx, GridCacheOperation op, - V val, IgniteClosure<V, V> transformClos, long ttl, GridCacheEntryEx<K,V> entry, - IgnitePredicate<GridCacheEntry<K, V>>[] filters, GridCacheVersion drVer) { + public GridCacheTxEntry(GridCacheContext<K, V> ctx, + GridCacheTxEx<K, V> tx, + GridCacheOperation op, + V val, + IgniteClosure<V, V> transformClos, + long ttl, + GridCacheEntryEx<K,V> entry, + IgnitePredicate<GridCacheEntry<K, V>>[] filters, + GridCacheVersion drVer) { assert ctx != null; assert tx != null; assert op != null; @@ -285,6 +305,7 @@ public class GridCacheTxEntry<K, V> implements GridPeerDeployAware, Externalizab cp.grpLock = grpLock; cp.depEnabled = depEnabled; cp.drVer = drVer; + cp.expiryPlc = expiryPlc; return cp; } @@ -708,6 +729,13 @@ public class GridCacheTxEntry<K, V> implements GridPeerDeployAware, Externalizab } /** + * Marks expiry policy for transfer if it explicitly set and differs from default one. + */ + public void transferExpiryPolicyIfNeeded() { + transferExpiryPlc = expiryPlc != null && expiryPlc != ctx.expiry(); + } + + /** * @param ctx Context. * @throws IgniteCheckedException If failed. */ @@ -768,6 +796,20 @@ public class GridCacheTxEntry<K, V> implements GridPeerDeployAware, Externalizab val.unmarshal(this.ctx, clsLdr, depEnabled); } + /** + * @param expiryPlc Expiry policy. + */ + public void expiry(@Nullable ExpiryPolicy expiryPlc) { + this.expiryPlc = expiryPlc; + } + + /** + * @return Expiry policy. + */ + @Nullable public ExpiryPolicy expiry() { + return expiryPlc; + } + /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { out.writeBoolean(depEnabled); @@ -793,6 +835,8 @@ public class GridCacheTxEntry<K, V> implements GridPeerDeployAware, Externalizab CU.writeVersion(out, explicitVer); out.writeBoolean(grpLock); CU.writeVersion(out, drVer); + + out.writeObject(transferExpiryPlc ? new GridCacheExpiryPolicy(expiryPlc) : null); } /** {@inheritDoc} */ @@ -821,6 +865,8 @@ public class GridCacheTxEntry<K, V> implements GridPeerDeployAware, Externalizab explicitVer = CU.readVersion(in); grpLock = in.readBoolean(); drVer = CU.readVersion(in); + + expiryPlc = (ExpiryPolicy)in.readObject(); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d84da7d0/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java index c6888f1..7b6f266 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java @@ -653,7 +653,10 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K byte[] valBytes = res.get3(); if (op == CREATE || op == UPDATE && txEntry.drExpireTime() == -1L) { - ExpiryPolicy expiry = cacheCtx.expiry(); + ExpiryPolicy expiry = txEntry.expiry(); + + if (expiry == null) + expiry = cacheCtx.expiry(); if (expiry != null) { Duration duration = cached.hasValue() ? @@ -661,7 +664,7 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K txEntry.ttl(GridCacheMapEntry.toTtl(duration)); - log.info("Calculated expiry (userCommit), update=" + cached.hasValue() + ", ttl=" + txEntry.ttl() + ", detached=" + cached.detached()); + log.info("Calculated expiry (userCommit), update=" + cached.hasValue() + ", ttl=" + txEntry.ttl() + ", plc=" + expiry); } } @@ -2684,7 +2687,7 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K @Nullable V val, @Nullable IgniteClosure<V, V> transformClos, GridCacheEntryEx<K, V> entry, - @Nullable ExpiryPolicy expiryPlc, // TODO IGNITE-41 + @Nullable ExpiryPolicy expiryPlc, IgnitePredicate<GridCacheEntry<K, V>>[] filter, boolean filtersSet, long drTtl, @@ -2739,7 +2742,7 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K entryTtlDr(key, drTtl, drExpireTime); } else - entryTtl(key, ttl); + entryExpiry(key, expiryPlc); txEntry = old; @@ -2747,15 +2750,23 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K log.debug("Updated transaction entry: " + txEntry); } else { - long ttl = -1L; - - if (drTtl >= 0L) - ttl = drTtl; - - txEntry = new GridCacheTxEntry<>(entry.context(), this, op, val, transformClos, ttl, entry, filter, drVer); + boolean hasDrTtl = drTtl >= 0; + + txEntry = new GridCacheTxEntry<>(entry.context(), + this, + op, + val, + transformClos, + hasDrTtl ? drTtl : -1L, + entry, + filter, + drVer); txEntry.drExpireTime(drExpireTime); + if (!hasDrTtl) + txEntry.expiry(expiryPlc); + txMap.put(key, txEntry); if (log.isDebugEnabled()) @@ -2844,6 +2855,19 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K /** * @param key Key. + * @param expiryPlc Expiry policy. + */ + void entryExpiry(GridCacheTxKey<K> key, @Nullable ExpiryPolicy expiryPlc) { + assert key != null; + + GridCacheTxEntry<K, V> e = entry(key); + + if (e != null) + e.expiry(expiryPlc); + } + + /** + * @param key Key. * @param ttl TTL. * @param expireTime Expire time. * @return {@code true} if tx entry exists for this key, {@code false} otherwise. @@ -2856,7 +2880,10 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K if (e != null) { e.ttl(ttl); + e.drExpireTime(expireTime); + + e.expiry(null); } return e != null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d84da7d0/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java index 3f30801..7cee7d9 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java @@ -692,8 +692,16 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach GridDhtLockFuture<K, V> fut = null; if (!req.inTx()) { - fut = new GridDhtLockFuture<>(ctx, nearNode.id(), req.version(), - req.topologyVersion(), cnt, req.txRead(), req.timeout(), tx, req.threadId(), filter); + fut = new GridDhtLockFuture<>(ctx, + nearNode.id(), + req.version(), + req.topologyVersion(), + cnt, + req.txRead(), + req.timeout(), + tx, + req.threadId(), + filter); // Add before mapping. if (!ctx.mvcc().addFuture(fut)) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d84da7d0/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java index 461ea04..44c397a 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java @@ -512,7 +512,7 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends GridCacheTxLocalAdapte GridCacheTxEntry<K, V> w = writeEntries == null ? null : writeEntries.get(idx++); - txEntry = addEntry(NOOP, null, null, cached, -1, CU.<K, V>empty(), false, -1L, -1L, + txEntry = addEntry(NOOP, null, null, cached, null, CU.<K, V>empty(), false, -1L, -1L, drVers != null ? drVers[drVerIdx++] : null); if (w != null) { @@ -526,6 +526,7 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends GridCacheTxLocalAdapte txEntry.ttl(w.ttl()); txEntry.filters(w.filters()); txEntry.drExpireTime(w.drExpireTime()); + txEntry.expiry(w.expiry()); } txEntry.cached(cached, txEntry.keyBytes()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d84da7d0/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockRequest.java index abcc189..4a1d501 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockRequest.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockRequest.java @@ -284,8 +284,12 @@ public class GridNearLockRequest<K, V> extends GridDistributedLockRequest<K, V> return dhtVers[idx]; } - /** {@inheritDoc} - * @param ctx*/ + /** {@inheritDoc} */ + @Override protected boolean transferExpiryPolicy() { + return true; + } + + /** {@inheritDoc} */ @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException { super.prepareMarshal(ctx); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d84da7d0/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java index e4d6b00..433d199 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java @@ -22,6 +22,7 @@ import org.gridgain.grid.util.typedef.internal.*; import org.jetbrains.annotations.*; import sun.misc.*; +import javax.cache.expiry.*; import java.io.*; import java.util.*; import java.util.concurrent.*; @@ -103,7 +104,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { return (V)updateAllInternal(UPDATE, Collections.singleton(key), Collections.singleton(val), - ttl, + expiryPerCall(), true, false, filter, @@ -124,7 +125,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { return (Boolean)updateAllInternal(UPDATE, Collections.singleton(key), Collections.singleton(val), - ttl, + expiryPerCall(), false, false, filter, @@ -142,7 +143,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { return (Boolean)updateAllInternal(UPDATE, Collections.singleton(key), Collections.singleton(val), - -1, + expiryPerCall(), false, false, filter, @@ -239,7 +240,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { return (GridCacheReturn<V>)updateAllInternal(UPDATE, Collections.singleton(key), Collections.singleton(newVal), - 0, + expiryPerCall(), true, true, ctx.equalsPeekArray(oldVal), @@ -256,7 +257,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { return (GridCacheReturn<V>)updateAllInternal(DELETE, Collections.singleton(key), null, - 0, + null, true, true, ctx.equalsPeekArray(val), @@ -292,7 +293,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { updateAllInternal(UPDATE, m.keySet(), m.values(), - 0, + expiryPerCall(), false, false, filter, @@ -314,7 +315,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { updateAllInternal(TRANSFORM, Collections.singleton(key), Collections.singleton(transformer), - -1, + expiryPerCall(), false, false, null, @@ -328,7 +329,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { return (R)updateAllInternal(TRANSFORM, Collections.singleton(key), Collections.singleton(new GridCacheTransformComputeClosure<>(transformer)), - -1, + expiryPerCall(), true, false, null, @@ -356,7 +357,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { updateAllInternal(TRANSFORM, m.keySet(), m.values(), - 0, + expiryPerCall(), false, false, null, @@ -383,7 +384,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { return (V)updateAllInternal(DELETE, Collections.singleton(key), null, - 0, + null, true, false, filter, @@ -409,7 +410,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { updateAllInternal(DELETE, keys, null, - 0, + null, false, false, filter, @@ -436,7 +437,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { return (Boolean)updateAllInternal(DELETE, Collections.singleton(key), null, - 0, + null, false, false, filter, @@ -464,7 +465,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { return (Boolean)updateAllInternal(DELETE, Collections.singleton(key), null, - 0, + null, false, false, ctx.equalsPeekArray(val), @@ -678,13 +679,14 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { map != null ? map.keySet() : transformMap != null ? transformMap.keySet() : null; final Collection<?> vals = map != null ? map.values() : transformMap != null ? transformMap.values() : null; final boolean storeEnabled = ctx.isStoreEnabled(); + final ExpiryPolicy expiry = expiryPerCall(); return asyncOp(new Callable<Object>() { @Override public Object call() throws Exception { return updateAllInternal(op, keys, vals, - ttl, + expiry, retval, rawRetval, filter, @@ -715,7 +717,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { return updateAllInternal(DELETE, keys, null, - 0, + null, retval, rawRetval, filter, @@ -730,7 +732,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { * @param op Operation. * @param keys Keys. * @param vals Values. - * @param ttl Time to live. + * @param expiryPlc Expiry policy. * @param retval Return value required flag. * @param rawRetval Return {@code GridCacheReturn} instance. * @param filter Cache entry filter. @@ -742,7 +744,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { private Object updateAllInternal(GridCacheOperation op, Collection<? extends K> keys, @Nullable Iterable<?> vals, - long ttl, + @Nullable ExpiryPolicy expiryPlc, boolean retval, boolean rawRetval, IgnitePredicate<GridCacheEntry<K, V>>[] filter, @@ -762,7 +764,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { UUID subjId = ctx.subjectIdPerCall(null); if (storeEnabled && keys.size() > 1) { - updateWithBatch(op, keys, vals, ver, filter, subjId, taskName); + updateWithBatch(op, keys, vals, expiryPlc, ver, filter, subjId, taskName); return null; } @@ -793,7 +795,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { val, storeEnabled, retval, - ttl, + expiryPlc, true, true, filter, @@ -860,6 +862,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { GridCacheOperation op, Collection<? extends K> keys, @Nullable Iterable<?> vals, + @Nullable ExpiryPolicy expiryPlc, GridCacheVersion ver, @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter, UUID subjId, @@ -941,6 +944,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { ver, putMap, null, + expiryPlc, err, subjId, taskName); @@ -971,6 +975,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { ver, null, rmvKeys, + expiryPlc, err, subjId, taskName); @@ -1067,6 +1072,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { ver, putMap, rmvKeys, + expiryPlc, err, subjId, taskName); @@ -1087,16 +1093,19 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { * @param ver Cache version. * @param putMap Values to put. * @param rmvKeys Keys to remove. + * @param expiryPlc Expiry policy. * @param err Optional partial update exception. * @param subjId Subject ID. * @param taskName Task name. * @return Partial update exception. */ @SuppressWarnings({"unchecked", "ConstantConditions", "ForLoopReplaceableByForEach"}) - @Nullable private GridCachePartialUpdateException updatePartialBatch(List<GridCacheEntryEx<K, V>> entries, + @Nullable private GridCachePartialUpdateException updatePartialBatch( + List<GridCacheEntryEx<K, V>> entries, final GridCacheVersion ver, @Nullable Map<K, V> putMap, @Nullable Collection<K> rmvKeys, + @Nullable ExpiryPolicy expiryPlc, @Nullable GridCachePartialUpdateException err, UUID subjId, String taskName @@ -1151,7 +1160,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { writeVal, false, false, - 0, + expiryPlc, true, true, null, @@ -1275,6 +1284,20 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { } /** + * @return Expiry policy. + */ + @Nullable private ExpiryPolicy expiryPerCall() { + GridCacheProjectionImpl<K, V> prj = ctx.projectionPerCall(); + + ExpiryPolicy expiry = prj != null ? prj.expiry() : null; + + if (expiry == null) + expiry = ctx.expiry(); + + return expiry; + } + + /** * @param op Operation closure. * @return Future. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d84da7d0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractTest.java new file mode 100644 index 0000000..9293431 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractTest.java @@ -0,0 +1,152 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.marshaller.optimized.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.gridgain.grid.cache.*; +import org.gridgain.testframework.junits.common.*; + +import static org.gridgain.grid.cache.GridCacheMode.*; +import static org.gridgain.grid.cache.GridCacheWriteSynchronizationMode.*; + +/** + * Abstract class for cache tests. + */ +public abstract class IgniteCacheAbstractTest extends GridCommonAbstractTest { + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** + * @return Grids count to start. + */ + protected abstract int gridCount(); + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + startGrids(); + } + + /** + * @throws Exception If failed. + */ + protected void startGrids() throws Exception { + int cnt = gridCount(); + + assert cnt >= 1 : "At least one grid must be started"; + + startGridsMultiThreaded(cnt); + + awaitPartitionMapExchange(); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setMaxMissedHeartbeats(Integer.MAX_VALUE); + + disco.setIpFinder(ipFinder); + + if (isDebug()) + disco.setAckTimeout(Integer.MAX_VALUE); + + cfg.setDiscoverySpi(disco); + + cfg.setCacheConfiguration(cacheConfiguration(gridName)); + + cfg.setMarshaller(new IgniteOptimizedMarshaller(false)); + + return cfg; + } + + /** + * @param gridName Grid name. + * @return Cache configuration. + * @throws Exception In case of error. + */ + protected GridCacheConfiguration cacheConfiguration(String gridName) throws Exception { + GridCacheConfiguration cfg = defaultCacheConfiguration(); + + cfg.setSwapEnabled(swapEnabled()); + cfg.setCacheMode(cacheMode()); + cfg.setAtomicityMode(atomicityMode()); + cfg.setWriteSynchronizationMode(writeSynchronization()); + cfg.setDistributionMode(distributionMode()); + cfg.setPortableEnabled(portableEnabled()); + + if (cacheMode() == PARTITIONED) + cfg.setBackups(1); + + return cfg; + } + + /** + * @return Default cache mode. + */ + protected abstract GridCacheMode cacheMode(); + + /** + * @return Cache atomicity mode. + */ + protected abstract GridCacheAtomicityMode atomicityMode(); + + /** + * @return Partitioned mode. + */ + protected abstract GridCacheDistributionMode distributionMode(); + + /** + * @return Write synchronization. + */ + protected GridCacheWriteSynchronizationMode writeSynchronization() { + return FULL_SYNC; + } + + /** + * @return Whether portable mode is enabled. + */ + protected boolean portableEnabled() { + return false; + } + + /** + * @return {@code true} if swap should be enabled. + */ + protected boolean swapEnabled() { + return false; + } + + /** + * @return Cache. + */ + protected <K, V> IgniteCache<K, V> jcache() { + return jcache(0); + } + + /** + * @param idx Grid index. + * @return Cache. + */ + protected <K, V> IgniteCache<K, V> jcache(int idx) { + return grid(idx).jcache(null); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d84da7d0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTest.java deleted file mode 100644 index ef0b4c3..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTest.java +++ /dev/null @@ -1,124 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.apache.ignite.internal.processors.cache; - -import org.apache.ignite.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.marshaller.optimized.*; -import org.apache.ignite.spi.discovery.tcp.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; -import org.gridgain.grid.cache.*; -import org.gridgain.testframework.junits.common.*; - -/** - * - */ -public class IgniteCacheTest extends GridCommonAbstractTest { - /** */ - private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); - - /** - * @return Grids count to start. - */ - protected int gridCount() { - return 2; - } - - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - startGrids(); - } - - /** - * @throws Exception If failed. - */ - protected void startGrids() throws Exception { - int cnt = gridCount(); - - assert cnt >= 1 : "At least one grid must be started"; - - startGridsMultiThreaded(cnt); - - awaitPartitionMapExchange(); - } - - /** {@inheritDoc} */ - @Override protected void afterTestsStopped() throws Exception { - stopAllGrids(); - } - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - TcpDiscoverySpi disco = new TcpDiscoverySpi(); - - disco.setMaxMissedHeartbeats(Integer.MAX_VALUE); - - disco.setIpFinder(ipFinder); - - if (isDebug()) - disco.setAckTimeout(Integer.MAX_VALUE); - - cfg.setDiscoverySpi(disco); - - cfg.setCacheConfiguration(cacheConfiguration(gridName)); - - cfg.setMarshaller(new IgniteOptimizedMarshaller(false)); - - return cfg; - } - - /** - * @param gridName Grid name. - * @return Cache configuration. - * @throws Exception In case of error. - */ - protected GridCacheConfiguration cacheConfiguration(String gridName) throws Exception { - GridCacheConfiguration cfg = defaultCacheConfiguration(); - - return cfg; - } - - /** - * @throws Exception If failed. - */ - public void testPutGetRemove() throws Exception { - IgniteCache<Integer, String> cache = jcache(); - - for (int i = 0; i < 10; i++) - cache.put(i, String.valueOf(i)); - - for (int i = 0; i < 10; i++) - assertEquals(String.valueOf(i), cache.get(i)); - - for (int i = 0; i < 10; i++) - cache.remove(i); - - for (int i = 0; i < 10; i++) - assertNull(cache.get(i)); - } - - /** - * @return Cache. - */ - protected <K, V> IgniteCache<K, V> jcache() { - return jcache(0); - } - - /** - * @param idx Grid index. - * @return Cache. - */ - protected <K, V> IgniteCache<K, V> jcache(int idx) { - return grid(idx).jcache(null); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d84da7d0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicExpiryPolicyTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicExpiryPolicyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicExpiryPolicyTest.java new file mode 100644 index 0000000..251bc05 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicExpiryPolicyTest.java @@ -0,0 +1,41 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.apache.ignite.internal.processors.cache.expiry; + +import org.gridgain.grid.cache.*; + +import static org.gridgain.grid.cache.GridCacheAtomicityMode.*; +import static org.gridgain.grid.cache.GridCacheDistributionMode.*; +import static org.gridgain.grid.cache.GridCacheMode.*; + +/** + * + */ +public class IgniteCacheAtomicExpiryPolicyTest extends IgniteCacheExpiryPolicyAbstractTest { + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 3; + } + + /** {@inheritDoc} */ + @Override protected GridCacheMode cacheMode() { + return PARTITIONED; + } + + /** {@inheritDoc} */ + @Override protected GridCacheAtomicityMode atomicityMode() { + return ATOMIC; + } + + /** {@inheritDoc} */ + @Override protected GridCacheDistributionMode distributionMode() { + return PARTITIONED_ONLY; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d84da7d0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicLocalExpiryPolicyTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicLocalExpiryPolicyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicLocalExpiryPolicyTest.java new file mode 100644 index 0000000..20c9666 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicLocalExpiryPolicyTest.java @@ -0,0 +1,41 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.apache.ignite.internal.processors.cache.expiry; + +import org.gridgain.grid.cache.*; + +import static org.gridgain.grid.cache.GridCacheAtomicityMode.*; +import static org.gridgain.grid.cache.GridCacheDistributionMode.*; +import static org.gridgain.grid.cache.GridCacheMode.*; + +/** + * + */ +public class IgniteCacheAtomicLocalExpiryPolicyTest extends IgniteCacheExpiryPolicyAbstractTest { + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 1; + } + + /** {@inheritDoc} */ + @Override protected GridCacheMode cacheMode() { + return LOCAL; + } + + /** {@inheritDoc} */ + @Override protected GridCacheAtomicityMode atomicityMode() { + return ATOMIC; + } + + /** {@inheritDoc} */ + @Override protected GridCacheDistributionMode distributionMode() { + return PARTITIONED_ONLY; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d84da7d0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicReplicatedExpiryPolicyTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicReplicatedExpiryPolicyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicReplicatedExpiryPolicyTest.java new file mode 100644 index 0000000..bca7705 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicReplicatedExpiryPolicyTest.java @@ -0,0 +1,24 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.apache.ignite.internal.processors.cache.expiry; + +import org.gridgain.grid.cache.*; + +import static org.gridgain.grid.cache.GridCacheMode.*; + +/** + * + */ +public class IgniteCacheAtomicReplicatedExpiryPolicyTest extends IgniteCacheAtomicExpiryPolicyTest { + /** {@inheritDoc} */ + @Override protected GridCacheMode cacheMode() { + return REPLICATED; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d84da7d0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java new file mode 100644 index 0000000..c8abd0e --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java @@ -0,0 +1,714 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.apache.ignite.internal.processors.cache.expiry; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.processors.cache.*; +import org.gridgain.grid.cache.*; +import org.gridgain.grid.kernal.*; +import org.gridgain.grid.kernal.processors.cache.*; +import org.gridgain.grid.util.lang.*; +import org.gridgain.grid.util.typedef.internal.*; +import org.gridgain.testframework.*; +import org.jetbrains.annotations.*; + +import javax.cache.configuration.*; +import javax.cache.expiry.*; +import java.util.*; +import java.util.concurrent.*; + +import static org.gridgain.grid.cache.GridCacheAtomicityMode.*; +import static org.gridgain.grid.cache.GridCacheDistributionMode.*; +import static org.gridgain.grid.cache.GridCacheMode.*; +import static org.gridgain.grid.cache.GridCacheTxConcurrency.*; +import static org.gridgain.grid.cache.GridCacheTxIsolation.*; + +/** + * + */ +public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbstractTest { + /** */ + private Factory<? extends ExpiryPolicy> factory; + + /** */ + private boolean nearCache; + + /** */ + private Integer lastKey = 0; + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + // No-op. + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testEternal() throws Exception { + factory = EternalExpiryPolicy.factoryOf(); + + startGrids(); + + for (final Integer key : keys()) { + log.info("Test eternalPolicy, key: " + key); + + eternal(key); + } + } + + /** + * @throws Exception If failed. + */ + public void testNullFactory() throws Exception { + factory = null; + + startGrids(); + + for (final Integer key : keys()) { + log.info("Test eternalPolicy, key: " + key); + + eternal(key); + } + } + + /** + * @param key Key. + * @throws Exception If failed. + */ + private void eternal(Integer key) throws Exception { + IgniteCache<Integer, Integer> cache = jcache(); + + cache.put(key, 1); // Create. + + checkTtl(key, 0); + + assertEquals((Integer) 1, cache.get(key)); // Get. + + checkTtl(key, 0); + + cache.put(key, 2); // Update. + + checkTtl(key, 0); + + assertTrue(cache.remove(key)); // Remove. + + /* + cache.withExpiryPolicy(new TestPolicy(60_000L, null, null)).put(key, 1); // Create with custom. + + checkTtl(key, 60_000L); + + cache.put(key, 2); // Update. + + checkTtl(key, 0); + + cache.withExpiryPolicy(new TestPolicy(null, 1000L, null)).put(key, 1); + + checkTtl(key, 1000L); + + waitExpired(key); + */ + } + + /** + * @throws Exception If failed. + */ + public void testCreateUpdate() throws Exception { + factory = new FactoryBuilder.SingletonFactory<>(new TestPolicy(60_000L, 61_000L, null)); + + startGrids(); + + for (final Integer key : keys()) { + log.info("Test createUpdate [key=" + key + ']'); + + createUpdate(key, null); + } + + for (final Integer key : keys()) { + log.info("Test createUpdateCustomPolicy [key=" + key + ']'); + + createUpdateCustomPolicy(key, null); + } + + createUpdatePutAll(null); + + GridCacheTxConcurrency[] txModes = {PESSIMISTIC}; + + if (atomicityMode() == TRANSACTIONAL) { + for (GridCacheTxConcurrency tx : txModes) { + for (final Integer key : keys()) { + log.info("Test createUpdate [key=" + key + ", tx=" + tx + ']'); + + createUpdate(key, tx); + } + + for (final Integer key : keys()) { + log.info("Test createUpdateCustomPolicy [key=" + key + ", tx=" + tx + ']'); + + createUpdateCustomPolicy(key, tx); + } + + createUpdatePutAll(tx); + } + } + } + + /** + * @param txConcurrency Not null transaction concurrency mode if explicit transaction should be started. + * @throws Exception If failed. + */ + private void createUpdatePutAll(@Nullable GridCacheTxConcurrency txConcurrency) throws Exception { + Map<Integer, Integer> vals = new HashMap<>(); + + for (int i = 0; i < 1000; i++) + vals.put(i, i); + + IgniteCache<Integer, Integer> cache = jcache(0); + + cache.removeAll(vals.keySet()); + + GridCacheTx tx = startTx(txConcurrency); + + // Create. + cache.putAll(vals); + + if (tx != null) + tx.commit(); + + for (Integer key : vals.keySet()) + checkTtl(key, 60_000); + + tx = startTx(txConcurrency); + + // Update. + cache.putAll(vals); + + if (tx != null) + tx.commit(); + + for (Integer key : vals.keySet()) + checkTtl(key, 61_000); + + tx = startTx(txConcurrency); + + // Update with provided TTL. + cache.withExpiryPolicy(new TestPolicy(null, 1000L, null)).putAll(vals); + + if (tx != null) + tx.commit(); + + for (Integer key : vals.keySet()) + checkTtl(key, 1000L); + + waitExpired(vals.keySet()); + + tx = startTx(txConcurrency); + + // Try create again. + cache.putAll(vals); + + if (tx != null) + tx.commit(); + + for (Integer key : vals.keySet()) + checkTtl(key, 60_000L); + + Map<Integer, Integer> newVals = new HashMap<>(vals); + + newVals.put(100_000, 1); + + // Updates and create. + cache.putAll(newVals); + + for (Integer key : vals.keySet()) + checkTtl(key, 61_000L); + + checkTtl(100_000, 60_000L); + + cache.removeAll(newVals.keySet()); + } + + /** + * @param key Key. + * @param txConcurrency Not null transaction concurrency mode if explicit transaction should be started. + * @throws Exception If failed. + */ + private void createUpdateCustomPolicy(Integer key, @Nullable GridCacheTxConcurrency txConcurrency) + throws Exception { + IgniteCache<Integer, Integer> cache = jcache(); + + assertNull(cache.get(key)); + + GridCacheTx tx = startTx(txConcurrency); + + cache.withExpiryPolicy(new TestPolicy(10_000L, 20_000L, 30_000L)).put(key, 1); + + if (tx != null) + tx.commit(); + + checkTtl(key, 10_000L); + + for (int idx = 0; idx < gridCount(); idx++) { + assertEquals(1, cache(idx).get(key)); // Try get. + + checkTtl(key, 10_000); + } + + tx = startTx(txConcurrency); + + // Update, returns null duration, should not change TTL. + cache.withExpiryPolicy(new TestPolicy(20_000L, null, null)).put(key, 2); + + if (tx != null) + tx.commit(); + + checkTtl(key, 10_000L); + + tx = startTx(txConcurrency); + + // Update with provided TTL. + cache.withExpiryPolicy(new TestPolicy(null, 1000L, null)).put(key, 2); + + if (tx != null) + tx.commit(); + + checkTtl(key, 1000L); + + waitExpired(key); + + tx = startTx(txConcurrency); + + // Create, returns null duration, should create with 0 TTL. + cache.withExpiryPolicy(new TestPolicy(null, 20_000L, 30_000L)).put(key, 1); + + if (tx != null) + tx.commit(); + + checkTtl(key, 0L); + } + + public void _testPrimary() throws Exception { + factory = new FactoryBuilder.SingletonFactory<>(new TestPolicy(60_000L, 61_000L, null)); + + nearCache = true; + + boolean inTx = true; + + startGrids(); + + IgniteCache<Integer, Integer> cache = jcache(0); + + GridCache<Integer, Object> cache0 = cache(0); + + Integer key = primaryKey(cache0); + + log.info("Create: " + key); + + GridCacheTx tx = inTx ? grid(0).transactions().txStart(OPTIMISTIC, READ_COMMITTED) : null; + + cache.put(key, 1); + + if (tx != null) + tx.commit(); + + checkTtl(key, 60_000); + + tx = inTx ? grid(0).transactions().txStart(OPTIMISTIC, READ_COMMITTED) : null; + + log.info("Update: " + key); + + cache.put(key, 2); + + if (tx != null) + tx.commit(); + + checkTtl(key, 61_000); + } + + /** + * @throws Exception If failed. + */ + public void _test1() throws Exception { + factory = new FactoryBuilder.SingletonFactory<>(new TestPolicy(60_000L, 61_000L, null)); + + nearCache = false; + + boolean inTx = true; + + startGrids(); + + Collection<Integer> keys = keys(); + + IgniteCache<Integer, Integer> cache = jcache(0); + + for (final Integer key : keys) { + log.info("Test key1: " + key); + + GridCacheTx tx = inTx ? grid(0).transactions().txStart(OPTIMISTIC, READ_COMMITTED) : null; + + cache.put(key, 1); + + if (tx != null) + tx.commit(); + + log.info("Test key2: " + key); + + tx = inTx ? grid(0).transactions().txStart(OPTIMISTIC, READ_COMMITTED) : null; + + cache.put(key, 2); + + if (tx != null) + tx.commit(); + + log.info("Done"); + } + } + + /** + * @param key Key. + * @param txConcurrency Not null transaction concurrency mode if explicit transaction should be started. + * @throws Exception If failed. + */ + private void createUpdate(Integer key, @Nullable GridCacheTxConcurrency txConcurrency) + throws Exception { + IgniteCache<Integer, Integer> cache = jcache(); + + // Run several times to make sure create after remove works as expected. + for (int i = 0; i < 3; i++) { + log.info("Iteration: " + i); + + GridCacheTx tx = startTx(txConcurrency); + + cache.put(key, 1); // Create. + + if (tx != null) + tx.commit(); + + checkTtl(key, 60_000); + + for (int idx = 0; idx < gridCount(); idx++) { + assertEquals(1, cache(idx).get(key)); // Try get. + + checkTtl(key, 60_000); + } + + tx = startTx(txConcurrency); + + cache.put(key, 2); // Update. + + if (tx != null) + tx.commit(); + + checkTtl(key, 61_000); + + for (int idx = 0; idx < gridCount(); idx++) { + assertEquals(2, cache(idx).get(key)); // Try get. + + checkTtl(key, 61_000); + } + + tx = startTx(txConcurrency); + + assertTrue(cache.remove(key)); + + if (tx != null) + tx.commit(); + + for (int idx = 0; idx < gridCount(); idx++) + assertNull(cache(idx).get(key)); + } + } + + /** + * @param txConcurrency Transaction concurrency mode. + * @return Transaction. + */ + @Nullable private GridCacheTx startTx(@Nullable GridCacheTxConcurrency txConcurrency) { + return txConcurrency == null ? null : ignite(0).transactions().txStart(txConcurrency, READ_COMMITTED); + } + + /** + * @throws Exception If failed. + */ + public void testNearCreateUpdate() throws Exception { + if (cacheMode() != PARTITIONED) + return; + + factory = new FactoryBuilder.SingletonFactory<>(new TestPolicy(60_000L, 61_000L, null)); + + nearCache = true; + + startGrids(); + + Integer key = nearKey(cache(0)); + + IgniteCache<Integer, Integer> jcache0 = jcache(0); + + jcache0.put(key, 1); + + checkTtl(key, 60_000); + + IgniteCache<Integer, Integer> jcache1 = jcache(1); + + // Update from another node. + jcache1.put(key, 2); + + checkTtl(key, 61_000L); + + // Update from another node with provided TTL. + jcache1.withExpiryPolicy(new TestPolicy(null, 1000L, null)).put(key, 3); + + checkTtl(key, 1000); + + waitExpired(key); + + // Try create again. + jcache0.put(key, 1); + + checkTtl(key, 60_000); + + // Update from near node with provided TTL. + jcache0.withExpiryPolicy(new TestPolicy(null, 1100L, null)).put(key, 2); + + checkTtl(key, 1100); + + waitExpired(key); + } + + /** + * @throws Exception If failed. + */ + public void testNearPutAll() throws Exception { + if (cacheMode() != PARTITIONED) + return; + + factory = new FactoryBuilder.SingletonFactory<>(new TestPolicy(60_000L, 61_000L, null)); + + nearCache = true; + + startGrids(); + + Map<Integer, Integer> vals = new HashMap<>(); + + for (int i = 0; i < 1000; i++) + vals.put(i, i); + + IgniteCache<Integer, Integer> jcache0 = jcache(0); + + jcache0.putAll(vals); + + for (Integer key : vals.keySet()) + checkTtl(key, 60_000); + + IgniteCache<Integer, Integer> jcache1 = jcache(1); + + // Update from another node. + jcache1.putAll(vals); + + for (Integer key : vals.keySet()) + checkTtl(key, 61_000); + + // Update from another node with provided TTL. + jcache1.withExpiryPolicy(new TestPolicy(null, 1000L, null)).putAll(vals); + + for (Integer key : vals.keySet()) + checkTtl(key, 1000); + + waitExpired(vals.keySet()); + + // Try create again. + jcache0.putAll(vals); + + // Update from near node with provided TTL. + jcache1.withExpiryPolicy(new TestPolicy(null, 1101L, null)).putAll(vals); + + for (Integer key : vals.keySet()) + checkTtl(key, 1101L); + + waitExpired(vals.keySet()); + } + + /** + * @return Test keys. + * @throws Exception If failed. + */ + private Collection<Integer> keys() throws Exception { + GridCache<Integer, Object> cache = cache(0); + + ArrayList<Integer> keys = new ArrayList<>(); + + keys.add(primaryKeys(cache, 1, lastKey).get(0)); + + if (gridCount() > 1) { + keys.add(backupKeys(cache, 1, lastKey).get(0)); + + if (cache.configuration().getCacheMode() != REPLICATED) + keys.add(nearKeys(cache, 1, lastKey).get(0)); + } + + lastKey = Collections.max(keys) + 1; + + return keys; + } + + /** + * @param key Key. + * @throws Exception If failed. + */ + private void waitExpired(Integer key) throws Exception { + waitExpired(Collections.singleton(key)); + } + + /** + * @param keys Keys. + * @throws Exception If failed. + */ + private void waitExpired(final Collection<Integer> keys) throws Exception { + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + for (int i = 0; i < gridCount(); i++) { + for (Integer key : keys) { + Object val = jcache(i).localPeek(key); + + if (val != null) { + // log.info("Value [grid=" + i + ", val=" + val + ']'); + + return false; + } + } + } + + return false; + } + }, 3000); + + GridCache<Integer, Object> cache = cache(0); + + for (int i = 0; i < gridCount(); i++) { + ClusterNode node = grid(i).cluster().localNode(); + + for (Integer key : keys) { + Object val = jcache(i).localPeek(key); + + if (val != null) { + log.info("Unexpected value [grid=" + i + + ", primary=" + cache.affinity().isPrimary(node, key) + + ", backup=" + cache.affinity().isBackup(node, key) + ']'); + } + + assertNull("Unexpected non-null value for grid " + i, val); + } + } + + for (int i = 0; i < gridCount(); i++) { + for (Integer key : keys) + assertNull("Unexpected non-null value for grid " + i, jcache(i).get(key)); + } + } + + /** + * @param key Key. + * @param ttl TTL. + * @throws Exception If failed. + */ + private void checkTtl(Object key, long ttl) throws Exception { + boolean found = false; + + for (int i = 0; i < gridCount(); i++) { + GridKernal grid = (GridKernal)grid(i); + + GridCacheAdapter<Object, Object> cache = grid.context().cache().internalCache(); + + GridCacheEntryEx<Object, Object> e = cache.peekEx(key); + + if (e == null && cache.context().isNear()) + e = cache.context().near().dht().peekEx(key); + + if (e == null) + assertTrue(!cache.affinity().isPrimaryOrBackup(grid.localNode(), key)); + else { + found = true; + + assertEquals("Unexpected ttl for grid " + i, ttl, e.ttl()); + + if (ttl > 0) + assertTrue(e.expireTime() > 0); + else + assertEquals(0, e.expireTime()); + } + } + + assertTrue(found); + } + + /** {@inheritDoc} */ + @Override protected GridCacheConfiguration cacheConfiguration(String gridName) throws Exception { + GridCacheConfiguration cfg = super.cacheConfiguration(gridName); + + if (nearCache && gridName.equals(getTestGridName(0))) + cfg.setDistributionMode(NEAR_PARTITIONED); + + cfg.setExpiryPolicyFactory(factory); + + return cfg; + } + + /** + * + */ + private class TestPolicy implements ExpiryPolicy { + /** */ + private Long create; + + /** */ + private Long access; + + /** */ + private Long update; + + /** + * @param create TTL for creation. + * @param access TTL for access. + * @param update TTL for update. + */ + TestPolicy(@Nullable Long create, + @Nullable Long update, + @Nullable Long access) { + this.create = create; + this.update = update; + this.access = access; + } + + /** {@inheritDoc} */ + @Override public Duration getExpiryForCreation() { + return create != null ? new Duration(TimeUnit.MILLISECONDS, create) : null; + } + + /** {@inheritDoc} */ + @Override public Duration getExpiryForAccess() { + return access != null ? new Duration(TimeUnit.MILLISECONDS, access) : null; + } + + /** {@inheritDoc} */ + @Override public Duration getExpiryForUpdate() { + return update != null ? new Duration(TimeUnit.MILLISECONDS, update) : null; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TestPolicy.class, this); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d84da7d0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTest.java deleted file mode 100644 index f96e5a7..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTest.java +++ /dev/null @@ -1,507 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.apache.ignite.internal.processors.cache.expiry; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.internal.processors.cache.*; -import org.gridgain.grid.cache.*; -import org.gridgain.grid.kernal.*; -import org.gridgain.grid.kernal.processors.cache.*; -import org.gridgain.grid.util.lang.*; -import org.gridgain.grid.util.typedef.internal.*; -import org.gridgain.testframework.*; -import org.jetbrains.annotations.*; - -import javax.cache.configuration.*; -import javax.cache.expiry.*; -import java.util.*; -import java.util.concurrent.*; - -import static org.gridgain.grid.cache.GridCacheAtomicityMode.*; -import static org.gridgain.grid.cache.GridCacheDistributionMode.*; -import static org.gridgain.grid.cache.GridCacheMode.*; - -/** - * - */ -public class IgniteCacheExpiryPolicyTest extends IgniteCacheTest { - /** */ - private Factory<? extends ExpiryPolicy> factory; - - /** */ - private boolean nearCache; - - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - // No-op. - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - stopAllGrids(); - } - - /** {@inheritDoc} */ - @Override protected int gridCount() { - return 3; - } - - public void testPrimary() throws Exception { - factory = new FactoryBuilder.SingletonFactory<>(new TestPolicy(60_000L, 61_000L, null)); - - nearCache = false; - - boolean inTx = false; - - startGrids(); - - IgniteCache<Integer, Integer> cache = jcache(0); - - GridCache<Integer, Object> cache0 = cache(0); - - Integer key = primaryKey(cache0); - - log.info("Create: " + key); - - GridCacheTx tx = inTx ? grid(0).transactions().txStart() : null; - - cache.put(key, 1); - - if (tx != null) - tx.commit(); - - checkTtl(key, 60_000); - - tx = inTx ? grid(0).transactions().txStart() : null; - - log.info("Update: " + key); - - cache.put(key, 2); - - if (tx != null) - tx.commit(); - - checkTtl(key, 61_000); - } - - public void testBackup() throws Exception { - factory = new FactoryBuilder.SingletonFactory<>(new TestPolicy(60_000L, 61_000L, null)); - - nearCache = false; - - boolean inTx = false; - - startGrids(); - - IgniteCache<Integer, Integer> cache = jcache(0); - - GridCache<Integer, Object> cache0 = cache(0); - - Integer key = backupKey(cache0); - - log.info("Create: " + key); - - GridCacheTx tx = inTx ? grid(0).transactions().txStart() : null; - - cache.put(key, 1); - - if (tx != null) - tx.commit(); - - checkTtl(key, 60_000); - - tx = inTx ? grid(0).transactions().txStart() : null; - - log.info("Update: " + key); - - cache.put(key, 2); - - if (tx != null) - tx.commit(); - - checkTtl(key, 61_000); - } - - public void testNear() throws Exception { - factory = new FactoryBuilder.SingletonFactory<>(new TestPolicy(60_000L, 61_000L, null)); - - nearCache = false; - - boolean inTx = true; - - startGrids(); - - IgniteCache<Integer, Integer> cache = jcache(0); - - GridCache<Integer, Object> cache0 = cache(0); - - Integer key = nearKey(cache0); - - log.info("Create: " + key); - - GridCacheTx tx = inTx ? grid(0).transactions().txStart() : null; - - cache.put(key, 1); - - if (tx != null) - tx.commit(); - - checkTtl(key, 60_000); - - tx = inTx ? grid(0).transactions().txStart() : null; - - log.info("Update: " + key); - - cache.put(key, 2); - - if (tx != null) - tx.commit(); - - checkTtl(key, 61_000); - } - - /** - * @throws Exception If failed. - */ - public void test1() throws Exception { - factory = new FactoryBuilder.SingletonFactory<>(new TestPolicy(60_000L, null, null)); - - nearCache = false; - - boolean inTx = true; - - startGrids(); - - Collection<Integer> keys = keys(); - - IgniteCache<Integer, Integer> cache = jcache(0); - - for (final Integer key : keys) { - log.info("Test key1: " + key); - - GridCacheTx tx = inTx ? grid(0).transactions().txStart() : null; - - cache.put(key, 1); - - if (tx != null) - tx.commit(); - } - - for (final Integer key : keys) { - log.info("Test key2: " + key); - - GridCacheTx tx = inTx ? grid(0).transactions().txStart() : null; - - cache.put(key, 2); - - if (tx != null) - tx.commit(); - } - } - - /** - * @throws Exception If failed. - */ - public void testCreated() throws Exception { - factory = new FactoryBuilder.SingletonFactory<>(new TestPolicy(60_000L, null, null)); - - startGrids(); - - Collection<Integer> keys = keys(); - - IgniteCache<Integer, Integer> cache = jcache(0); - - for (final Integer key : keys) { - log.info("Test key: " + key); - - cache.put(key, 1); - - checkTtl(key, 60_000); - - for (int i = 0; i < gridCount(); i++) { - assertEquals((Integer)1, cache.get(key)); - - checkTtl(key, 60_000); - } - - cache.withExpiryPolicy(new TestPolicy(1000L, null, null)).put(key, 2); // Update, should not change TTL. - - checkTtl(key, 60_000); - - assertEquals((Integer)2, cache.get(key)); - - assertTrue(cache.remove(key)); - - cache.withExpiryPolicy(new TestPolicy(1000L, null, null)).put(key, 3); // Create with provided TTL. - - checkTtl(key, 1000); - - waitExpired(key); - } - } - - /** - * @throws Exception If failed. - */ - public void testNearPut() throws Exception { - factory = new FactoryBuilder.SingletonFactory<>(new TestPolicy(60_000L, null, null)); - - nearCache = true; - - startGrids(); - - GridCache<Integer, Object> cache0 = cache(0); - - Integer key = nearKey(cache0); - - IgniteCache<Integer, Integer> jcache0 = jcache(0); - - jcache0.put(key, 1); - - checkTtl(key, 60_000); - - IgniteCache<Integer, Integer> jcache1 = jcache(1); - - // Update from another node with provided TTL. - jcache1.withExpiryPolicy(new TestPolicy(null, 1000L, null)).put(key, 2); - - checkTtl(key, 1000); - - waitExpired(key); - - jcache1.remove(key); - - jcache0.put(key, 1); - - checkTtl(key, 60_000); - - // Update from near node with provided TTL. - jcache0.withExpiryPolicy(new TestPolicy(null, 1100L, null)).put(key, 2); - - checkTtl(key, 1100); - - waitExpired(key); - } - - /** - * @throws Exception If failed. - */ - public void testNearPutAll() throws Exception { - factory = new FactoryBuilder.SingletonFactory<>(new TestPolicy(60_000L, null, null)); - - nearCache = true; - - startGrids(); - - Map<Integer, Integer> vals = new HashMap<>(); - - for (int i = 0; i < 1000; i++) - vals.put(i, i); - - IgniteCache<Integer, Integer> jcache0 = jcache(0); - - jcache0.putAll(vals); - - for (Integer key : vals.keySet()) - checkTtl(key, 60_000); - - IgniteCache<Integer, Integer> jcache1 = jcache(1); - - // Update from another node with provided TTL. - jcache1.withExpiryPolicy(new TestPolicy(null, 1000L, null)).putAll(vals); - - for (Integer key : vals.keySet()) - checkTtl(key, 1000); - - waitExpired(vals.keySet()); - - jcache0.removeAll(vals.keySet()); - - jcache0.putAll(vals); - - // Update from near node with provided TTL. - jcache1.withExpiryPolicy(new TestPolicy(null, 1101L, null)).putAll(vals); - - for (Integer key : vals.keySet()) - checkTtl(key, 1101L); - - waitExpired(vals.keySet()); - } - - /** - * @return Test keys. - * @throws Exception If failed. - */ - private Collection<Integer> keys() throws Exception { - GridCache<Integer, Object> cache = cache(0); - - Collection<Integer> keys = new ArrayList<>(); - - keys.add(primaryKey(cache)); - - if (gridCount() > 1) { - keys.add(backupKey(cache)); - - if (cache.configuration().getCacheMode() != REPLICATED) - keys.add(nearKey(cache)); - } - - return keys; - } - - /** - * @param key Key. - * @throws Exception If failed. - */ - private void waitExpired(Integer key) throws Exception { - waitExpired(Collections.singleton(key)); - } - - /** - * @param keys Keys. - * @throws Exception If failed. - */ - private void waitExpired(final Collection<Integer> keys) throws Exception { - GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { - for (int i = 0; i < gridCount(); i++) { - for (Integer key : keys) { - Object val = jcache(i).localPeek(key); - - if (val != null) { - // log.info("Value [grid=" + i + ", val=" + val + ']'); - - return false; - } - } - } - - return false; - } - }, 3000); - - GridCache<Integer, Object> cache = cache(0); - - for (int i = 0; i < gridCount(); i++) { - ClusterNode node = grid(i).cluster().localNode(); - - for (Integer key : keys) { - Object val = jcache(i).localPeek(key); - - if (val != null) { - log.info("Unexpected value [grid=" + i + - ", primary=" + cache.affinity().isPrimary(node, key) + - ", backup=" + cache.affinity().isBackup(node, key) + ']'); - } - - assertNull("Unexpected non-null value for grid " + i, val); - } - } - - for (int i = 0; i < gridCount(); i++) { - for (Integer key : keys) - assertNull("Unexpected non-null value for grid " + i, jcache(i).get(key)); - } - } - - /** - * @param key Key. - * @param ttl TTL. - * @throws Exception If failed. - */ - private void checkTtl(Object key, long ttl) throws Exception { - for (int i = 0; i < gridCount(); i++) { - GridKernal grid = (GridKernal)grid(i); - - GridCacheAdapter<Object, Object> cache = grid.context().cache().internalCache(); - - GridCacheEntryEx<Object, Object> e = cache.peekEx(key); - - if (e == null && cache.context().isNear()) - e = cache.context().near().dht().peekEx(key); - - if (e == null) - assertTrue(!cache.affinity().isPrimaryOrBackup(grid.localNode(), key)); - else - assertEquals("Unexpected ttl for grid " + i, ttl, e.ttl()); - } - } - - /** {@inheritDoc} */ - @Override protected GridCacheConfiguration cacheConfiguration(String gridName) throws Exception { - assert factory != null; - - GridCacheConfiguration cfg = super.cacheConfiguration(gridName); - - cfg.setCacheMode(PARTITIONED); - cfg.setAtomicityMode(TRANSACTIONAL); - - //cfg.setAtomicityMode(ATOMIC); - - cfg.setBackups(1); - - if (nearCache && gridName.equals(getTestGridName(0))) - cfg.setDistributionMode(NEAR_PARTITIONED); - else - cfg.setDistributionMode(PARTITIONED_ONLY); - - cfg.setExpiryPolicyFactory(factory); - - return cfg; - } - - /** - * - */ - private class TestPolicy implements ExpiryPolicy { - /** */ - private Long create; - - /** */ - private Long access; - - /** */ - private Long update; - - /** - * @param create TTL for creation. - * @param access TTL for access. - * @param update TTL for update. - */ - TestPolicy(@Nullable Long create, - @Nullable Long update, - @Nullable Long access) { - this.create = create; - this.update = update; - this.access = access; - } - - /** {@inheritDoc} */ - @Override public Duration getExpiryForCreation() { - return create != null ? new Duration(TimeUnit.MILLISECONDS, create) : null; - } - - /** {@inheritDoc} */ - @Override public Duration getExpiryForAccess() { - return access != null ? new Duration(TimeUnit.MILLISECONDS, access) : null; - } - - /** {@inheritDoc} */ - @Override public Duration getExpiryForUpdate() { - return update != null ? new Duration(TimeUnit.MILLISECONDS, update) : null; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(TestPolicy.class, this); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d84da7d0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java new file mode 100644 index 0000000..fd2d205 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java @@ -0,0 +1,35 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.apache.ignite.internal.processors.cache.expiry; + +import junit.framework.*; + +/** + * + */ +public class IgniteCacheExpiryPolicyTestSuite extends TestSuite { + /** + * @return Cache API test suite. + * @throws Exception If failed. + */ + public static TestSuite suite() throws Exception { + TestSuite suite = new TestSuite("Cache Expiry Policy Test Suite"); + + suite.addTestSuite(IgniteCacheAtomicLocalExpiryPolicyTest.class); + suite.addTestSuite(IgniteCacheAtomicExpiryPolicyTest.class); + suite.addTestSuite(IgniteCacheAtomicReplicatedExpiryPolicyTest.class); + + suite.addTestSuite(IgniteCacheTxLocalExpiryPolicyTest.class); + suite.addTestSuite(IgniteCacheTxExpiryPolicyTest.class); + suite.addTestSuite(IgniteCacheTxReplicatedExpiryPolicyTest.class); + + return suite; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d84da7d0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheTxExpiryPolicyTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheTxExpiryPolicyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheTxExpiryPolicyTest.java new file mode 100644 index 0000000..0abe27d --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheTxExpiryPolicyTest.java @@ -0,0 +1,41 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.apache.ignite.internal.processors.cache.expiry; + +import org.gridgain.grid.cache.*; + +import static org.gridgain.grid.cache.GridCacheAtomicityMode.*; +import static org.gridgain.grid.cache.GridCacheDistributionMode.*; +import static org.gridgain.grid.cache.GridCacheMode.*; + +/** + * + */ +public class IgniteCacheTxExpiryPolicyTest extends IgniteCacheExpiryPolicyAbstractTest { + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 3; + } + + /** {@inheritDoc} */ + @Override protected GridCacheMode cacheMode() { + return PARTITIONED; + } + + /** {@inheritDoc} */ + @Override protected GridCacheAtomicityMode atomicityMode() { + return TRANSACTIONAL; + } + + /** {@inheritDoc} */ + @Override protected GridCacheDistributionMode distributionMode() { + return PARTITIONED_ONLY; + } +}