# ignite-41
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/76429793 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/76429793 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/76429793 Branch: refs/heads/ignite-41 Commit: 764297932db43970d4cdcdb54475064bb76d44a1 Parents: 91227df Author: sboikov <sboi...@gridgain.com> Authored: Wed Dec 17 18:11:41 2014 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Wed Dec 17 18:11:41 2014 +0300 ---------------------------------------------------------------------- .../cache/GridCacheAccessExpiryPolicy.java | 85 ++++++ .../processors/cache/GridCacheAdapter.java | 3 +- .../processors/cache/GridCacheContext.java | 11 +- .../processors/cache/GridCacheEntryEx.java | 6 +- .../processors/cache/GridCacheMapEntry.java | 39 ++- .../processors/cache/GridCacheTxAdapter.java | 3 +- .../cache/GridCacheTxLocalAdapter.java | 19 +- .../distributed/GridCacheTtlUpdateRequest.java | 285 +++++++++++++++++++ .../distributed/dht/GridDhtCacheAdapter.java | 58 +++- .../dht/GridDhtTransactionalCacheAdapter.java | 3 +- .../dht/GridPartitionedGetFuture.java | 14 +- .../dht/atomic/GridDhtAtomicCache.java | 83 +++++- .../dht/colocated/GridDhtColocatedCache.java | 27 +- .../distributed/near/GridNearCacheAdapter.java | 11 +- .../distributed/near/GridNearGetFuture.java | 10 +- .../local/atomic/GridLocalAtomicCache.java | 12 +- .../GridTcpCommunicationMessageFactory.java | 7 +- .../IgniteCacheExpiryPolicyAbstractTest.java | 110 +++---- .../IgniteCacheExpiryPolicyTestSuite.java | 2 +- .../processors/cache/GridCacheTestEntryEx.java | 3 +- 20 files changed, 649 insertions(+), 142 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/76429793/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAccessExpiryPolicy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAccessExpiryPolicy.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAccessExpiryPolicy.java new file mode 100644 index 0000000..0b6152d --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAccessExpiryPolicy.java @@ -0,0 +1,85 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.kernal.processors.cache; + +import org.gridgain.grid.kernal.processors.cache.distributed.*; +import org.gridgain.grid.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import javax.cache.expiry.*; + +/** + * + */ +public class GridCacheAccessExpiryPolicy { + /** */ + private final long ttl; + + /** */ + private GridCacheTtlUpdateRequest req; + + /** + * @param expiryPlc Expiry policy. + * @return Access expire policy. + */ + public static GridCacheAccessExpiryPolicy forPolicy(@Nullable ExpiryPolicy expiryPlc) { + if (expiryPlc == null) + return null; + + Duration duration = expiryPlc.getExpiryForAccess(); + + if (duration == null) + return null; + + return new GridCacheAccessExpiryPolicy(GridCacheMapEntry.toTtl(duration)); + } + + /** + * @param ttl TTL for access. + */ + public GridCacheAccessExpiryPolicy(long ttl) { + assert ttl >= 0 : ttl; + + this.ttl = ttl; + } + + /** + * @return TTL. + */ + public long ttl() { + return ttl; + } + + /** + * @param key Entry key. + * @param keyBytes Entry key bytes. + * @param ver Entry version. + */ + @SuppressWarnings("unchecked") + public void ttlUpdated(Object key, byte[] keyBytes, GridCacheVersion ver) { + if (req == null) + req = new GridCacheTtlUpdateRequest(ttl); + + req.addEntry(key, keyBytes, ver); + } + + /** + * @return TTL update request. + */ + @SuppressWarnings("unchecked") + @Nullable public <K, V> GridCacheTtlUpdateRequest<K, V> request() { + return (GridCacheTtlUpdateRequest<K, V>)req; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridCacheAccessExpiryPolicy.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/76429793/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java index 39b7338..1f40a7c 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java @@ -1812,7 +1812,8 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im subjId, null, taskName, - filter); + filter, + null); GridCacheVersion ver = entry.version(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/76429793/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java index 931e243..cb4337e 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java @@ -1090,8 +1090,17 @@ public class GridCacheContext<K, V> implements Externalizable { if (subjId != null) return subjId; - GridCacheProjectionImpl<K, V> prj = projectionPerCall(); + return subjectIdPerCall(subjId, projectionPerCall()); + } + /** + * Gets subject ID per call. + * + * @param subjId Optional already existing subject ID. + * @param prj Optional thread local projection. + * @return Subject ID per call. + */ + public UUID subjectIdPerCall(@Nullable UUID subjId, @Nullable GridCacheProjectionImpl<K, V> prj) { if (prj != null) subjId = prj.subjectId(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/76429793/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java index 76d73b3..2b38247 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java @@ -270,10 +270,11 @@ public interface GridCacheEntryEx<K, V> extends GridMetadataAware { * temporary object can used for filter evaluation or transform closure execution and * should not be returned to user. * @param subjId Subject ID initiated this read. + * @param transformClo Transform closure to record event. * @param taskName Task name. * @param filter Filter to check prior to getting the value. Note that filter check * together with getting the value is an atomic operation. - * @param transformClo Transform closure to record event. + * @param expiryPlc Expiry policy. * @return Cached value. * @throws IgniteCheckedException If loading value failed. * @throws GridCacheEntryRemovedException If entry was removed. @@ -290,7 +291,8 @@ public interface GridCacheEntryEx<K, V> extends GridMetadataAware { UUID subjId, Object transformClo, String taskName, - IgnitePredicate<GridCacheEntry<K, V>>[] filter) + IgnitePredicate<GridCacheEntry<K, V>>[] filter, + @Nullable GridCacheAccessExpiryPolicy expiryPlc) throws IgniteCheckedException, GridCacheEntryRemovedException, GridCacheFilterFailedException; /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/76429793/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java index 7f9ff4d..00f7382 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java @@ -699,7 +699,8 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> UUID subjId, Object transformClo, String taskName, - IgnitePredicate<GridCacheEntry<K, V>>[] filter) + IgnitePredicate<GridCacheEntry<K, V>>[] filter, + @Nullable GridCacheAccessExpiryPolicy expirePlc) throws IgniteCheckedException, GridCacheEntryRemovedException, GridCacheFilterFailedException { cctx.denyOnFlag(LOCAL); @@ -714,7 +715,8 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> subjId, transformClo, taskName, - filter); + filter, + expirePlc); } /** {@inheritDoc} */ @@ -730,7 +732,8 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> UUID subjId, Object transformClo, String taskName, - IgnitePredicate<GridCacheEntry<K, V>>[] filter) + IgnitePredicate<GridCacheEntry<K, V>>[] filter, + @Nullable GridCacheAccessExpiryPolicy expiryPlc) throws IgniteCheckedException, GridCacheEntryRemovedException, GridCacheFilterFailedException { // Disable read-through if there is no store. if (readThrough && !cctx.isStoreEnabled()) @@ -877,6 +880,16 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> // No more notifications. evt = false; } + + if (ret != null && expiryPlc != null) { + long ttl = expiryPlc.ttl(); + + assert ttl >= 0 : ttl; + + updateTtl(ttl); + + expiryPlc.ttlUpdated(key(), getOrMarshalKeyBytes(), version()); + } } if (asyncRefresh && !readThrough && cctx.isStoreEnabled()) { @@ -906,7 +919,8 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> subjId, transformClo, taskName, - filter); + filter, + expiryPlc); } boolean loadedFromStore = false; @@ -987,7 +1001,8 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> subjId, transformClo, taskName, - filter); + filter, + expiryPlc); } /** {@inheritDoc} */ @@ -1658,6 +1673,10 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> return toTtl(duration); } + /** + * @param duration Duration. + * @return TTL. + */ public static long toTtl(Duration duration) { if (duration == null) return -1; @@ -1826,17 +1845,19 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> boolean pass = cctx.isAll(wrapFilterLocked(), filter); if (!pass) { - if (!isNew() && expiryPlc != null) { + if (hasValueUnlocked() && expiryPlc != null) { Duration duration = expiryPlc.getExpiryForAccess(); - if (duration != null) - updateTtl(toTtl(duration)); + newTtl = toTtl(duration); + + if (newTtl != -1L) + updateTtl(newTtl); } return new GridCacheUpdateAtomicResult<>(false, retval ? old : null, null, - 0L, + newTtl, -1L, null, null, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/76429793/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxAdapter.java index 7a32afa..edf1e92 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxAdapter.java @@ -1166,7 +1166,8 @@ public abstract class GridCacheTxAdapter<K, V> extends GridMetadataAwareAdapter /*subjId*/subjId, /**closure name */recordEvt ? F.first(txEntry.transformClosures()) : null, resolveTaskName(), - CU.<K, V>empty()); + CU.<K, V>empty(), + null); try { for (IgniteClosure<V, V> clos : txEntry.transformClosures()) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/76429793/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java index 7b6f266..1c96b32 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java @@ -663,8 +663,6 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K expiry.getExpiryForUpdate() : expiry.getExpiryForCreation(); txEntry.ttl(GridCacheMapEntry.toTtl(duration)); - - log.info("Calculated expiry (userCommit), update=" + cached.hasValue() + ", ttl=" + txEntry.ttl() + ", plc=" + expiry); } } @@ -1081,7 +1079,8 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K CU.subjectId(this, cctx), transformClo, resolveTaskName(), - filter); + filter, + null); if (val != null) { if (!readCommitted()) @@ -1155,7 +1154,8 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K CU.subjectId(this, cctx), null, resolveTaskName(), - filter); + filter, + null); if (val != null) { V val0 = val; @@ -1501,7 +1501,8 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K CU.subjectId(GridCacheTxLocalAdapter.this, cctx), transformClo, resolveTaskName(), - filter); + filter, + null); // If value is in cache and passed the filter. if (val != null) { @@ -1844,7 +1845,7 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K if (optimistic()) { try { - //Should read through if filter is specified. + // Should read through if filter is specified. old = entry.innerGet(this, /*swap*/false, /*read-through*/readThrough, @@ -1856,7 +1857,8 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K CU.subjectId(this, cctx), transformClo, resolveTaskName(), - CU.<K, V>empty()); + CU.<K, V>empty(), + null); } catch (GridCacheFilterFailedException e) { e.printStackTrace(); @@ -2065,7 +2067,8 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K CU.subjectId(this, cctx), null, resolveTaskName(), - CU.<K, V>empty()); + CU.<K, V>empty(), + null); } catch (GridCacheFilterFailedException e) { e.printStackTrace(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/76429793/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTtlUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTtlUpdateRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTtlUpdateRequest.java new file mode 100644 index 0000000..71e314e --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTtlUpdateRequest.java @@ -0,0 +1,285 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.kernal.processors.cache.distributed; + +import org.apache.ignite.*; +import org.gridgain.grid.kernal.*; +import org.gridgain.grid.kernal.processors.cache.*; +import org.gridgain.grid.util.direct.*; +import org.gridgain.grid.util.tostring.*; +import org.gridgain.grid.util.typedef.internal.*; + +import java.nio.*; +import java.util.*; + +/** + * + */ +public class GridCacheTtlUpdateRequest<K, V> extends GridCacheMessage<K, V> { + /** */ + @GridDirectCollection(byte[].class) + private List<byte[]> keysBytes; + + /** Entry keys. */ + @GridToStringInclude + @GridDirectTransient + private List<K> keys; + + /** Entry versions. */ + @GridDirectCollection(GridCacheVersion.class) + private List<GridCacheVersion> vers; + + /** New TTL. */ + private long ttl; + + /** + * Required empty constructor. + */ + public GridCacheTtlUpdateRequest() { + // No-op. + } + + /** + * @param ttl TTL. + */ + public GridCacheTtlUpdateRequest(long ttl) { + assert ttl >= 0 : ttl; + + this.ttl = ttl; + } + + /** + * @param key Key. + * @param keyBytes Key bytes. + * @param ver Version. + */ + public void addEntry(K key, byte[] keyBytes, GridCacheVersion ver) { + if (keys == null) { + keys = new ArrayList<>(); + + keysBytes = new ArrayList<>(); + + vers = new ArrayList<>(); + } + + keys.add(key); + + keysBytes.add(keyBytes); + + vers.add(ver); + } + + /** + * @return Keys. + */ + public List<K> keys() { + return keys; + } + + /** {@inheritDoc} */ + @Override public void finishUnmarshal(GridCacheSharedContext<K, V> ctx, ClassLoader ldr) + throws IgniteCheckedException { + super.finishUnmarshal(ctx, ldr); + + if (keys == null && keysBytes != null) + keys = unmarshalCollection(keysBytes, ctx, ldr); + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 82; + } + + /** {@inheritDoc} */ + @SuppressWarnings("CloneDoesntCallSuperClone") + @Override public GridTcpCommunicationMessageAdapter clone() { + GridCacheTtlUpdateRequest _clone = new GridCacheTtlUpdateRequest(); + + clone0(_clone); + + return _clone; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!super.writeTo(buf)) + return false; + + if (!commState.typeWritten) { + if (!commState.putByte(directType())) + return false; + + commState.typeWritten = true; + } + + switch (commState.idx) { + case 3: + if (keysBytes != null) { + if (commState.it == null) { + if (!commState.putInt(keysBytes.size())) + return false; + + commState.it = keysBytes.iterator(); + } + + while (commState.it.hasNext() || commState.cur != NULL) { + if (commState.cur == NULL) + commState.cur = commState.it.next(); + + if (!commState.putByteArray((byte[])commState.cur)) + return false; + + commState.cur = NULL; + } + + commState.it = null; + } else { + if (!commState.putInt(-1)) + return false; + } + + commState.idx++; + + case 4: + if (!commState.putLong(ttl)) + return false; + + commState.idx++; + + case 5: + if (vers != null) { + if (commState.it == null) { + if (!commState.putInt(vers.size())) + return false; + + commState.it = vers.iterator(); + } + + while (commState.it.hasNext() || commState.cur != NULL) { + if (commState.cur == NULL) + commState.cur = commState.it.next(); + + if (!commState.putCacheVersion((GridCacheVersion)commState.cur)) + return false; + + commState.cur = NULL; + } + + commState.it = null; + } else { + if (!commState.putInt(-1)) + return false; + } + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!super.readFrom(buf)) + return false; + + switch (commState.idx) { + case 3: + if (commState.readSize == -1) { + if (buf.remaining() < 4) + return false; + + commState.readSize = commState.getInt(); + } + + if (commState.readSize >= 0) { + if (keysBytes == null) + keysBytes = new ArrayList<>(commState.readSize); + + for (int i = commState.readItems; i < commState.readSize; i++) { + byte[] _val = commState.getByteArray(); + + if (_val == BYTE_ARR_NOT_READ) + return false; + + keysBytes.add((byte[])_val); + + commState.readItems++; + } + } + + commState.readSize = -1; + commState.readItems = 0; + + commState.idx++; + + case 4: + if (buf.remaining() < 8) + return false; + + ttl = commState.getLong(); + + commState.idx++; + + case 5: + if (commState.readSize == -1) { + if (buf.remaining() < 4) + return false; + + commState.readSize = commState.getInt(); + } + + if (commState.readSize >= 0) { + if (vers == null) + vers = new ArrayList<>(commState.readSize); + + for (int i = commState.readItems; i < commState.readSize; i++) { + GridCacheVersion _val = commState.getCacheVersion(); + + if (_val == CACHE_VER_NOT_READ) + return false; + + vers.add((GridCacheVersion)_val); + + commState.readItems++; + } + } + + commState.readSize = -1; + commState.readItems = 0; + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { + super.clone0(_msg); + + GridCacheTtlUpdateRequest _clone = (GridCacheTtlUpdateRequest)_msg; + + _clone.keysBytes = keysBytes; + _clone.keys = keys; + _clone.vers = vers; + _clone.ttl = ttl; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridCacheTtlUpdateRequest.class, this, "super", super.toString()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/76429793/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java index c13f8f4..3557d17 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java @@ -92,6 +92,24 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap } /** {@inheritDoc} */ + @Override public void start() throws IgniteCheckedException { + super.start(); + + ctx.io().addHandler(ctx.cacheId(), GridCacheTtlUpdateRequest.class, new CI2<UUID, GridCacheTtlUpdateRequest<K, V>>() { + @Override public void apply(UUID nodeId, GridCacheTtlUpdateRequest<K, V> req) { + processTtlUpdateRequest(req); + } + }); + } + + /** + * @param req Request. + */ + private void processTtlUpdateRequest(GridCacheTtlUpdateRequest<K, V> req) { + log.info("Ttl update: " + req); + } + + /** {@inheritDoc} */ @Override public void stop() { super.stop(); @@ -470,11 +488,26 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap * @param filter Optional filter. * @return DHT future. */ - public GridDhtFuture<Collection<GridCacheEntryInfo<K, V>>> getDhtAsync(UUID reader, long msgId, - LinkedHashMap<? extends K, Boolean> keys, boolean reload, long topVer, @Nullable UUID subjId, - int taskNameHash, boolean deserializePortable, IgnitePredicate<GridCacheEntry<K, V>>[] filter) { - GridDhtGetFuture<K, V> fut = new GridDhtGetFuture<>(ctx, msgId, reader, keys, reload, /*tx*/null, - topVer, filter, subjId, taskNameHash, deserializePortable); + public GridDhtFuture<Collection<GridCacheEntryInfo<K, V>>> getDhtAsync(UUID reader, + long msgId, + LinkedHashMap<? extends K, Boolean> keys, + boolean reload, + long topVer, + @Nullable UUID subjId, + int taskNameHash, + boolean deserializePortable, + IgnitePredicate<GridCacheEntry<K, V>>[] filter) { + GridDhtGetFuture<K, V> fut = new GridDhtGetFuture<>(ctx, + msgId, + reader, + keys, + reload, + /*tx*/null, + topVer, + filter, + subjId, + taskNameHash, + deserializePortable); fut.init(); @@ -489,13 +522,22 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap assert isAffinityNode(cacheCfg); IgniteFuture<Collection<GridCacheEntryInfo<K, V>>> fut = - getDhtAsync(nodeId, req.messageId(), req.keys(), req.reload(), req.topologyVersion(), req.subjectId(), - req.taskNameHash(), false, req.filter()); + getDhtAsync(nodeId, + req.messageId(), + req.keys(), + req.reload(), + req.topologyVersion(), + req.subjectId(), + req.taskNameHash(), + false, + req.filter()); fut.listenAsync(new CI1<IgniteFuture<Collection<GridCacheEntryInfo<K, V>>>>() { @Override public void apply(IgniteFuture<Collection<GridCacheEntryInfo<K, V>>> f) { GridNearGetResponse<K, V> res = new GridNearGetResponse<>(ctx.cacheId(), - req.futureId(), req.miniId(), req.version()); + req.futureId(), + req.miniId(), + req.version()); GridDhtFuture<Collection<GridCacheEntryInfo<K, V>>> fut = (GridDhtFuture<Collection<GridCacheEntryInfo<K, V>>>)f; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/76429793/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java index 7cee7d9..dfbe4fa 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java @@ -971,7 +971,8 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach CU.subjectId(tx, ctx.shared()), null, tx != null ? tx.resolveTaskName() : null, - CU.<K, V>empty()); + CU.<K, V>empty(), + null); assert e.lockedBy(mappedVer) || (ctx.mvcc().isRemoved(e.context(), mappedVer) && req.timeout() > 0) : http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/76429793/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridPartitionedGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridPartitionedGetFuture.java index f79bc2f..5f6af05 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridPartitionedGetFuture.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridPartitionedGetFuture.java @@ -293,8 +293,15 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M // If this is the primary or backup node for the keys. if (n.isLocal()) { final GridDhtFuture<Collection<GridCacheEntryInfo<K, V>>> fut = - cache().getDhtAsync(n.id(), -1, mappedKeys, reload, topVer, subjId, - taskName == null ? 0 : taskName.hashCode(), deserializePortable, filters); + cache().getDhtAsync(n.id(), + -1, + mappedKeys, + reload, + topVer, + subjId, + taskName == null ? 0 : taskName.hashCode(), + deserializePortable, + filters); final Collection<Integer> invalidParts = fut.invalidPartitions(); @@ -405,7 +412,8 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M subjId, null, taskName, - filters); + filters, + null); colocated.context().evicts().touch(entry, topVer); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/76429793/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index d1deed4..6287e16 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -16,6 +16,7 @@ import org.apache.ignite.plugin.security.*; import org.gridgain.grid.cache.*; import org.gridgain.grid.kernal.managers.communication.*; import org.gridgain.grid.kernal.processors.cache.*; +import org.gridgain.grid.kernal.processors.cache.distributed.*; import org.gridgain.grid.kernal.processors.cache.distributed.dht.*; import org.gridgain.grid.kernal.processors.cache.distributed.dht.preloader.*; import org.gridgain.grid.kernal.processors.cache.distributed.near.*; @@ -141,6 +142,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /** {@inheritDoc} */ @SuppressWarnings({"IfMayBeConditional", "SimplifiableIfStatement"}) @Override public void start() throws IgniteCheckedException { + super.start(); + resetMetrics(); preldr = new GridDhtPreloader<>(ctx); @@ -258,13 +261,24 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { final boolean deserializePortable, @Nullable final IgnitePredicate<GridCacheEntry<K, V>>[] filter ) { - subjId = ctx.subjectIdPerCall(subjId); + GridCacheProjectionImpl<K, V> prj = ctx.projectionPerCall(); + + subjId = ctx.subjectIdPerCall(null, prj); final UUID subjId0 = subjId; + final ExpiryPolicy expiryPlc = prj != null ? prj.expiry() : null; + return asyncOp(new CO<IgniteFuture<Map<K, V>>>() { @Override public IgniteFuture<Map<K, V>> apply() { - return getAllAsync0(keys, false, forcePrimary, filter, subjId0, taskName, deserializePortable); + return getAllAsync0(keys, + false, + forcePrimary, + filter, + expiryPlc, + subjId0, + taskName, + deserializePortable); } }); } @@ -595,7 +609,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { GridCacheProjectionImpl<K, V> prj = ctx.projectionPerCall(); - UUID subjId = ctx.subjectIdPerCall(null); // TODO IGNITE-41. + UUID subjId = ctx.subjectIdPerCall(null, prj); int taskNameHash = ctx.kernalContext().job().currentTaskNameHash(); @@ -691,10 +705,19 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param reload Reload flag. * @param forcePrimary Force primary flag. * @param filter Filter. + * @param expiryPlc Expiry policy. + * @param subjId Subject ID. + * @param taskName Task name. + * @param deserializePortable Deserialize portable flag. * @return Get future. */ - private IgniteFuture<Map<K, V>> getAllAsync0(@Nullable Collection<? extends K> keys, boolean reload, - boolean forcePrimary, @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter, UUID subjId, String taskName, + private IgniteFuture<Map<K, V>> getAllAsync0(@Nullable Collection<? extends K> keys, + boolean reload, + boolean forcePrimary, + @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter, + @Nullable ExpiryPolicy expiryPlc, + UUID subjId, + String taskName, boolean deserializePortable) { ctx.checkSecurity(GridSecurityPermission.CACHE_READ); @@ -712,6 +735,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { boolean success = true; + final GridCacheAccessExpiryPolicy expiry = + GridCacheAccessExpiryPolicy.forPolicy(expiryPlc != null ? expiryPlc : ctx.expiry()); + // Optimistically expect that all keys are available locally (avoid creation of get future). for (K key : keys) { GridCacheEntryEx<K, V> entry = null; @@ -735,7 +761,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { subjId, null, taskName, - filter); + filter, + expiry); // Entry was not in memory or in swap, so we remove it from cache. if (v == null) { @@ -785,13 +812,42 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { break; } - if (success) + if (success) { + if (expiry != null && expiry.request() != null) { + ctx.closures().runLocalSafe(new Runnable() { + @Override public void run() { + try { + GridCacheTtlUpdateRequest<K, V> req = expiry.request(); + + assert !F.isEmpty(req.keys()); + + Collection<ClusterNode> nodes = ctx.affinity().remoteNodes(req.keys(), -1); + + req.cacheId(ctx.cacheId()); + + ctx.io().safeSend(nodes, req, null); + } + catch (IgniteCheckedException e) { + log.error("Failed to send TTL update request.", e); + } + } + }); + } + return ctx.wrapCloneMap(new GridFinishedFuture<>(ctx.kernalContext(), locVals)); + } } // Either reload or not all values are available locally. - GridPartitionedGetFuture<K, V> fut = new GridPartitionedGetFuture<>(ctx, keys, topVer, reload, forcePrimary, - filter, subjId, taskName, deserializePortable); + GridPartitionedGetFuture<K, V> fut = new GridPartitionedGetFuture<>(ctx, + keys, + topVer, + reload, + forcePrimary, + filter, + subjId, + taskName, + deserializePortable); fut.init(); @@ -1066,7 +1122,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { req.subjectId(), transform, taskName, - CU.<K, V>empty()); + CU.<K, V>empty(), + null); if (transformMap == null) transformMap = new HashMap<>(); @@ -1174,7 +1231,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { req.subjectId(), null, taskName, - CU.<K, V>empty()); + CU.<K, V>empty(), + null); updated = (V)ctx.config().getInterceptor().onBeforePut(entry.key(), old, updated); @@ -1207,7 +1265,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { req.subjectId(), null, taskName, - CU.<K, V>empty()); + CU.<K, V>empty(), + null); IgniteBiTuple<Boolean, ?> interceptorRes = ctx.config().getInterceptor().onBeforeRemove( entry.key(), old); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/76429793/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index 8e9f808..1052e1d 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -207,11 +207,20 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte * @param keys Keys to load. * @param reload Reload flag. * @param forcePrimary Force get from primary node flag. + * @param topVer Topology version. + * @param subjId Subject ID. + * @param taskName Task name. + * @param deserializePortable Deserialize portable flag. * @param filter Filter. * @return Loaded values. */ - public IgniteFuture<Map<K, V>> loadAsync(@Nullable Collection<? extends K> keys, boolean reload, - boolean forcePrimary, long topVer, @Nullable UUID subjId, String taskName, boolean deserializePortable, + public IgniteFuture<Map<K, V>> loadAsync(@Nullable Collection<? extends K> keys, + boolean reload, + boolean forcePrimary, + long topVer, + @Nullable UUID subjId, + String taskName, + boolean deserializePortable, @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter) { if (keys == null || keys.isEmpty()) return new GridFinishedFuture<>(ctx.kernalContext(), Collections.<K, V>emptyMap()); @@ -248,7 +257,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte subjId, null, taskName, - filter); + filter, + null); // Entry was not in memory or in swap, so we remove it from cache. if (v == null) { @@ -301,8 +311,15 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte } // Either reload or not all values are available locally. - GridPartitionedGetFuture<K, V> fut = new GridPartitionedGetFuture<>(ctx, keys, topVer, reload, forcePrimary, - filter, subjId, taskName, deserializePortable); + GridPartitionedGetFuture<K, V> fut = new GridPartitionedGetFuture<>(ctx, + keys, + topVer, + reload, + forcePrimary, + filter, + subjId, + taskName, + deserializePortable); fut.init(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/76429793/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearCacheAdapter.java index b785103..db1a058 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearCacheAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearCacheAdapter.java @@ -259,8 +259,15 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda GridCacheTxLocalEx<K, V> txx = (tx != null && tx.local()) ? (GridCacheTxLocalEx<K, V>)tx : null; - GridNearGetFuture<K, V> fut = new GridNearGetFuture<>(ctx, keys, reload, forcePrimary, txx, filter, - subjId, taskName, deserializePortable); + GridNearGetFuture<K, V> fut = new GridNearGetFuture<>(ctx, + keys, + reload, + forcePrimary, + txx, + filter, + subjId, + taskName, + deserializePortable); // init() will register future for responses if future has remote mappings. fut.init(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/76429793/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetFuture.java index 6e1f494..1f1de06 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetFuture.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetFuture.java @@ -250,7 +250,9 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma * @param mapped Mappings to check for duplicates. * @param topVer Topology version to map on. */ - private void map(Collection<? extends K> keys, Map<ClusterNode, LinkedHashMap<K, Boolean>> mapped, final long topVer) { + private void map(Collection<? extends K> keys, + Map<ClusterNode, LinkedHashMap<K, Boolean>> mapped, + final long topVer) { Collection<ClusterNode> affNodes = CU.affinityNodes(cctx, topVer); if (affNodes.isEmpty()) { @@ -402,7 +404,8 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma subjId, null, taskName, - filters); + filters, + null); ClusterNode primary = null; @@ -427,7 +430,8 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma subjId, null, taskName, - filters); + filters, + null); // Entry was not in memory or in swap, so we remove it from cache. if (v == null && isNew && entry.markObsoleteIfEmpty(ver)) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/76429793/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java index 433d199..6eda650 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java @@ -607,7 +607,8 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { subjId, null, taskName, - filter); + filter, + null); if (v != null) vals.put(key, v); @@ -924,7 +925,8 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { subjId, transform, taskName, - CU.<K, V>empty()); + CU.<K, V>empty(), + null); V updated = transform.apply(old); @@ -1004,7 +1006,8 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { subjId, null, taskName, - CU.<K, V>empty()); + CU.<K, V>empty(), + null); val = ctx.config().getInterceptor().onBeforePut(entry.key(), old, val); @@ -1034,7 +1037,8 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { subjId, null, taskName, - CU.<K, V>empty()); + CU.<K, V>empty(), + null); IgniteBiTuple<Boolean, ?> interceptorRes = ctx.config().getInterceptor().onBeforeRemove( entry.key(), old); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/76429793/modules/core/src/main/java/org/gridgain/grid/util/direct/GridTcpCommunicationMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/util/direct/GridTcpCommunicationMessageFactory.java b/modules/core/src/main/java/org/gridgain/grid/util/direct/GridTcpCommunicationMessageFactory.java index 7c92065..b2ae55b 100644 --- a/modules/core/src/main/java/org/gridgain/grid/util/direct/GridTcpCommunicationMessageFactory.java +++ b/modules/core/src/main/java/org/gridgain/grid/util/direct/GridTcpCommunicationMessageFactory.java @@ -37,7 +37,7 @@ import java.util.*; */ public class GridTcpCommunicationMessageFactory { /** Common message producers. */ - private static final GridTcpCommunicationMessageProducer[] COMMON = new GridTcpCommunicationMessageProducer[82]; + private static final GridTcpCommunicationMessageProducer[] COMMON = new GridTcpCommunicationMessageProducer[83]; /** * Custom messages registry. Used for test purposes. @@ -264,6 +264,9 @@ public class GridTcpCommunicationMessageFactory { case 81: return new GridJobExecuteRequestV2(); + case 82: + return new GridCacheTtlUpdateRequest(); + default: assert false : "Invalid message type."; @@ -274,7 +277,7 @@ public class GridTcpCommunicationMessageFactory { 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, /* 65-72 - GGFS messages. */ 73, 74, 75, 76, 77, 78, 79, - 80, 81); + 80, 81, 82); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/76429793/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java index c8abd0e..93b0405 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java @@ -125,6 +125,37 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs /** * @throws Exception If failed. */ + public void testAccess() throws Exception { + factory = new FactoryBuilder.SingletonFactory<>(new TestPolicy(60_000L, 61_000L, 62_000L)); + + startGrids(); + + for (final Integer key : keys()) { + log.info("Test access [key=" + key + ']'); + + access(key); + } + } + + /** + * @param key Key. + * @throws Exception If failed. + */ + private void access(Integer key) throws Exception { + IgniteCache<Integer, Integer> cache = jcache(); + + cache.put(key, 1); + + checkTtl(key, 60_000L); + + assertEquals((Integer)1, cache.get(key)); + + checkTtl(key, 62_000L); + } + + /** + * @throws Exception If failed. + */ public void testCreateUpdate() throws Exception { factory = new FactoryBuilder.SingletonFactory<>(new TestPolicy(60_000L, 61_000L, null)); @@ -299,83 +330,6 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs checkTtl(key, 0L); } - public void _testPrimary() throws Exception { - factory = new FactoryBuilder.SingletonFactory<>(new TestPolicy(60_000L, 61_000L, null)); - - nearCache = true; - - boolean inTx = true; - - startGrids(); - - IgniteCache<Integer, Integer> cache = jcache(0); - - GridCache<Integer, Object> cache0 = cache(0); - - Integer key = primaryKey(cache0); - - log.info("Create: " + key); - - GridCacheTx tx = inTx ? grid(0).transactions().txStart(OPTIMISTIC, READ_COMMITTED) : null; - - cache.put(key, 1); - - if (tx != null) - tx.commit(); - - checkTtl(key, 60_000); - - tx = inTx ? grid(0).transactions().txStart(OPTIMISTIC, READ_COMMITTED) : null; - - log.info("Update: " + key); - - cache.put(key, 2); - - if (tx != null) - tx.commit(); - - checkTtl(key, 61_000); - } - - /** - * @throws Exception If failed. - */ - public void _test1() throws Exception { - factory = new FactoryBuilder.SingletonFactory<>(new TestPolicy(60_000L, 61_000L, null)); - - nearCache = false; - - boolean inTx = true; - - startGrids(); - - Collection<Integer> keys = keys(); - - IgniteCache<Integer, Integer> cache = jcache(0); - - for (final Integer key : keys) { - log.info("Test key1: " + key); - - GridCacheTx tx = inTx ? grid(0).transactions().txStart(OPTIMISTIC, READ_COMMITTED) : null; - - cache.put(key, 1); - - if (tx != null) - tx.commit(); - - log.info("Test key2: " + key); - - tx = inTx ? grid(0).transactions().txStart(OPTIMISTIC, READ_COMMITTED) : null; - - cache.put(key, 2); - - if (tx != null) - tx.commit(); - - log.info("Done"); - } - } - /** * @param key Key. * @param txConcurrency Not null transaction concurrency mode if explicit transaction should be started. @@ -436,7 +390,7 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs * @return Transaction. */ @Nullable private GridCacheTx startTx(@Nullable GridCacheTxConcurrency txConcurrency) { - return txConcurrency == null ? null : ignite(0).transactions().txStart(txConcurrency, READ_COMMITTED); + return txConcurrency == null ? null : ignite(0).transactions().txStart(txConcurrency, REPEATABLE_READ); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/76429793/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java index fd2d205..7f25b3f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java @@ -16,7 +16,7 @@ import junit.framework.*; */ public class IgniteCacheExpiryPolicyTestSuite extends TestSuite { /** - * @return Cache API test suite. + * @return Cache Expiry Policy test suite. * @throws Exception If failed. */ public static TestSuite suite() throws Exception { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/76429793/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java index 16e5b25..bbce8ca 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java @@ -402,7 +402,8 @@ public class GridCacheTestEntryEx<K, V> extends GridMetadataAwareAdapter impleme UUID subjId, Object transformClo, String taskName, - IgnitePredicate<GridCacheEntry<K, V>>[] filter) { + IgnitePredicate<GridCacheEntry<K, V>>[] filter, + @Nullable GridCacheAccessExpiryPolicy expiryPlc) { return val; }