Repository: incubator-ignite Updated Branches: refs/heads/ignite-238 e9ba67ff5 -> 4fc95f9bf
# ignite-238 Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/4fc95f9b Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/4fc95f9b Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/4fc95f9b Branch: refs/heads/ignite-238 Commit: 4fc95f9bfedcba7aad032172c512ebde75d470da Parents: e9ba67f Author: sboikov <sboi...@gridgain.com> Authored: Fri Feb 13 09:59:01 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Fri Feb 13 09:59:01 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheAdapter.java | 40 +++++--- .../distributed/dht/GridDhtCacheAdapter.java | 27 +++-- .../dht/atomic/GridDhtAtomicCache.java | 101 +------------------ .../dht/colocated/GridDhtColocatedCache.java | 4 +- .../distributed/near/GridNearCacheAdapter.java | 2 +- .../cache/distributed/near/GridNearTxLocal.java | 19 ++-- .../local/atomic/GridLocalAtomicCache.java | 7 +- .../cache/query/GridCacheQueryManager.java | 4 +- 8 files changed, 51 insertions(+), 153 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4fc95f9b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 0cd8886..1c93302 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -2043,7 +2043,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, taskName, deserializePortable, forcePrimary, - accessExpiryPolicy(prj != null ? prj.expiry() : null), + expiryPolicy(prj != null ? prj.expiry() : null), filter); } @@ -5609,11 +5609,11 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, * @param plc Explicitly specified expiry policy for cache operation. * @return Expiry policy wrapper. */ - @Nullable public GetExpiryPolicy accessExpiryPolicy(@Nullable ExpiryPolicy plc) { + @Nullable public IgniteCacheExpiryPolicy expiryPolicy(@Nullable ExpiryPolicy plc) { if (plc == null) plc = ctx.expiry(); - return GetExpiryPolicy.forPolicy(plc); + return CacheExpiryPolicy.forPolicy(plc); } /** @@ -5988,7 +5988,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, /** * */ - protected abstract static class GetExpiryPolicy implements IgniteCacheExpiryPolicy { + protected abstract static class CacheExpiryPolicy implements IgniteCacheExpiryPolicy { /** */ private Map<Object, IgniteBiTuple<byte[], GridCacheVersion>> entries; @@ -5999,14 +5999,22 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, * @param expiryPlc Expiry policy. * @return Access expire policy. */ - @Nullable public static GetExpiryPolicy forPolicy(@Nullable final ExpiryPolicy expiryPlc) { + @Nullable private static CacheExpiryPolicy forPolicy(@Nullable final ExpiryPolicy expiryPlc) { if (expiryPlc == null) return null; - return new GetExpiryPolicy() { + return new CacheExpiryPolicy() { @Override public long forAccess() { return CU.toTtl(expiryPlc.getExpiryForAccess()); } + + @Override public long forCreate() { + return CU.toTtl(expiryPlc.getExpiryForCreation()); + } + + @Override public long forUpdate() { + return CU.toTtl(expiryPlc.getExpiryForUpdate()); + } }; } @@ -6014,11 +6022,11 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, * @param ttl Access TTL. * @return Access expire policy. */ - @Nullable public static GetExpiryPolicy forTtl(final long ttl) { + @Nullable public static CacheExpiryPolicy forAccess(final long ttl) { if (ttl == CU.TTL_NOT_CHANGED) return null; - return new GetExpiryPolicy() { + return new CacheExpiryPolicy() { @Override public long forAccess() { return ttl; } @@ -6027,16 +6035,16 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, /** {@inheritDoc} */ @Override public long forCreate() { - return -1L; + return CU.TTL_NOT_CHANGED; } /** {@inheritDoc} */ @Override public long forUpdate() { - return -1L; + return CU.TTL_NOT_CHANGED; } /** {@inheritDoc} */ - @Override public synchronized void reset() { + @Override public void reset() { if (entries != null) entries.clear(); @@ -6050,7 +6058,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, * @param ver Entry version. */ @SuppressWarnings("unchecked") - @Override public synchronized void ttlUpdated(Object key, + @Override public void ttlUpdated(Object key, byte[] keyBytes, GridCacheVersion ver, @Nullable Collection<UUID> rdrs) { @@ -6079,23 +6087,23 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, /** * @return TTL update request. */ - @Nullable @Override public synchronized Map<Object, IgniteBiTuple<byte[], GridCacheVersion>> entries() { + @Nullable @Override public Map<Object, IgniteBiTuple<byte[], GridCacheVersion>> entries() { return entries; } /** {@inheritDoc} */ - @Nullable @Override public synchronized Map<UUID, Collection<IgniteBiTuple<byte[], GridCacheVersion>>> readers() { + @Nullable @Override public Map<UUID, Collection<IgniteBiTuple<byte[], GridCacheVersion>>> readers() { return rdrsMap; } /** {@inheritDoc} */ - @Override public synchronized boolean readyToFlush(int cnt) { + @Override public boolean readyToFlush(int cnt) { return (entries != null && entries.size() > cnt) || (rdrsMap != null && rdrsMap.size() > cnt); } /** {@inheritDoc} */ @Override public String toString() { - return S.toString(GetExpiryPolicy.class, this); + return S.toString(CacheExpiryPolicy.class, this); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4fc95f9b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java index ba60a29..fbc3bf2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java @@ -621,20 +621,19 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap long ttl = req.accessTtl(); - final GetExpiryPolicy expiryPlc = GetExpiryPolicy.forTtl(ttl); - - IgniteInternalFuture<Collection<GridCacheEntryInfo<K, V>>> fut = - getDhtAsync(nodeId, - req.messageId(), - req.keys(), - req.readThrough(), - req.reload(), - req.topologyVersion(), - req.subjectId(), - req.taskNameHash(), - false, - req.filter(), - expiryPlc); + final CacheExpiryPolicy expiryPlc = CacheExpiryPolicy.forAccess(ttl); + + IgniteInternalFuture<Collection<GridCacheEntryInfo<K, V>>> fut = getDhtAsync(nodeId, + req.messageId(), + req.keys(), + req.readThrough(), + req.reload(), + req.topologyVersion(), + req.subjectId(), + req.taskNameHash(), + false, + req.filter(), + expiryPlc); fut.listenAsync(new CI1<IgniteInternalFuture<Collection<GridCacheEntryInfo<K, V>>>>() { @Override public void apply(IgniteInternalFuture<Collection<GridCacheEntryInfo<K, V>>> f) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4fc95f9b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index d3a9385..55e7c0d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -884,7 +884,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { long topVer = ctx.affinity().affinityTopologyVersion(); - final GetExpiryPolicy expiry = accessExpiryPolicy(expiryPlc); + final IgniteCacheExpiryPolicy expiry = expiryPolicy(expiryPlc); // Optimisation: try to resolve value locally and escape 'get future' creation. if (!reload && !forcePrimary) { @@ -1101,10 +1101,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { boolean replicate = ctx.isDrEnabled(); - ExpiryPolicy plc = req.expiry() != null ? req.expiry() : ctx.expiry(); - - if (plc != null) - expiry = new UpdateExpiryPolicy(plc); + expiry = expiryPolicy(req.expiry()); if (keys.size() > 1 && // Several keys ... writeThrough() && // and store is enabled ... @@ -2943,98 +2940,4 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { pendingResponses.remove(nodeId, this); } } - - /** - * - */ - private static class UpdateExpiryPolicy implements IgniteCacheExpiryPolicy { - /** */ - private final ExpiryPolicy plc; - - /** */ - private Map<Object, IgniteBiTuple<byte[], GridCacheVersion>> entries; - - /** */ - private Map<UUID, Collection<IgniteBiTuple<byte[], GridCacheVersion>>> rdrsMap; - - /** - * @param plc Expiry policy. - */ - private UpdateExpiryPolicy(ExpiryPolicy plc) { - assert plc != null; - - this.plc = plc; - } - - /** {@inheritDoc} */ - @Override public long forCreate() { - return toTtl(plc.getExpiryForCreation()); - } - - /** {@inheritDoc} */ - @Override public long forUpdate() { - return toTtl(plc.getExpiryForUpdate()); - } - - /** {@inheritDoc} */ - @Override public long forAccess() { - return toTtl(plc.getExpiryForAccess()); - } - - /** {@inheritDoc} */ - @Override public void ttlUpdated(Object key, - byte[] keyBytes, - GridCacheVersion ver, - @Nullable Collection<UUID> rdrs) { - if (entries == null) - entries = new HashMap<>(); - - IgniteBiTuple<byte[], GridCacheVersion> t = new IgniteBiTuple<>(keyBytes, ver); - - entries.put(key, t); - - if (rdrs != null && !rdrs.isEmpty()) { - if (rdrsMap == null) - rdrsMap = new HashMap<>(); - - for (UUID nodeId : rdrs) { - Collection<IgniteBiTuple<byte[], GridCacheVersion>> col = rdrsMap.get(nodeId); - - if (col == null) - rdrsMap.put(nodeId, col = new ArrayList<>()); - - col.add(t); - } - } - } - - /** {@inheritDoc} */ - @Override public void reset() { - if (entries != null) - entries.clear(); - - if (rdrsMap != null) - rdrsMap.clear(); - } - - /** {@inheritDoc} */ - @Nullable @Override public Map<Object, IgniteBiTuple<byte[], GridCacheVersion>> entries() { - return entries; - } - - /** {@inheritDoc} */ - @Nullable @Override public Map<UUID, Collection<IgniteBiTuple<byte[], GridCacheVersion>>> readers() { - return rdrsMap; - } - - /** {@inheritDoc} */ - @Override public boolean readyToFlush(int cnt) { - return (entries != null && entries.size() > cnt) || (rdrsMap != null && rdrsMap.size() > cnt); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(UpdateExpiryPolicy.class, this); - } - } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4fc95f9b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index 3b6aab5..a685595 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -195,7 +195,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte taskName, deserializePortable, filter, - accessExpiryPolicy(prj != null ? prj.expiry() : null)); + expiryPolicy(prj != null ? prj.expiry() : null)); } /** {@inheritDoc} */ @@ -257,7 +257,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte validateCacheKeys(keys); if (expiryPlc == null) - expiryPlc = accessExpiryPolicy(ctx.expiry()); + expiryPlc = expiryPolicy(null); // Optimisation: try to resolve value locally and escape 'get future' creation. if (!reload && !forcePrimary) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4fc95f9b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java index c5fa33e..4c82a2a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java @@ -289,7 +289,7 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda IgniteTxLocalEx<K, V> txx = (tx != null && tx.local()) ? (IgniteTxLocalEx<K, V>)tx : null; - final GetExpiryPolicy expiry = accessExpiryPolicy(expiryPlc); + final IgniteCacheExpiryPolicy expiry = expiryPolicy(expiryPlc); GridNearGetFuture<K, V> fut = new GridNearGetFuture<>(ctx, keys, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4fc95f9b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index 6fa2f6a..506f063 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -1180,23 +1180,16 @@ public class GridNearTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> { { assert optimistic(); - if (expiryPlc == null) - expiryPlc = ctx.expiry(); + IgniteCacheExpiryPolicy plc = ctx.cache().expiryPolicy(expiryPlc); - if (expiryPlc != null) { - IgniteCacheExpiryPolicy plc = ctx.cache().accessExpiryPolicy(expiryPlc); + if (plc != null) { + if (accessMap == null) + accessMap = new HashMap<>(); - if (plc != null) { - if (accessMap == null) - accessMap = new HashMap<>(); - - accessMap.put(key, plc); - } - - return plc; + accessMap.put(key, plc); } - return null; + return plc; } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4fc95f9b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java index d9f8e97..a0909b1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java @@ -578,17 +578,12 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { UUID subjId = ctx.subjectIdPerCall(null, prj); - ExpiryPolicy expiryPlc = prj != null ? prj.expiry() : null; - - if (expiryPlc == null) - expiryPlc = ctx.expiry(); - Map<K, V> vals = new HashMap<>(keys.size(), 1.0f); if (keyCheck) validateCacheKeys(keys); - final GetExpiryPolicy expiry = accessExpiryPolicy(expiryPlc); + final IgniteCacheExpiryPolicy expiry = expiryPolicy(prj != null ? prj.expiry() : null); boolean success = true; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4fc95f9b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index 7fb9d63..2a4b8cf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -765,7 +765,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte final GridCloseableIteratorAdapter<IgniteBiTuple<K, V>> heapIt = new GridCloseableIteratorAdapter<IgniteBiTuple<K, V>>() { private IgniteBiTuple<K, V> next; - private IgniteCacheExpiryPolicy expiryPlc = cctx.cache().accessExpiryPolicy(plc); + private IgniteCacheExpiryPolicy expiryPlc = cctx.cache().expiryPolicy(plc); private Iterator<K> iter = qry.includeBackups() || cctx.isReplicated() ? prj.keySet().iterator() : prj.primaryKeySet().iterator(); @@ -812,7 +812,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte if (dht != null && expiryPlc != null && expiryPlc.readyToFlush(100)) { dht.sendTtlUpdateRequest(expiryPlc); - expiryPlc = cctx.cache().accessExpiryPolicy(plc); + expiryPlc = cctx.cache().expiryPolicy(plc); } if (val != null) {