ignite-728 Need to reimplement CREATE-TIME-TTL as eviction policy
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ef7d0114 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ef7d0114 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ef7d0114 Branch: refs/heads/ignite-157-2 Commit: ef7d0114c4466eefaff1098c41e5bdb6c3766a28 Parents: 2a68725 Author: agura <ag...@gridgain.com> Authored: Thu Apr 23 21:26:31 2015 +0300 Committer: agura <ag...@gridgain.com> Committed: Tue Apr 28 17:15:45 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheTtlManager.java | 164 ++++++++++++------- .../processors/cache/GridCacheUtils.java | 5 +- .../IgniteCacheEntryListenerAbstractTest.java | 4 +- 3 files changed, 111 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ef7d0114/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java index 5198b53..d8af2b9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java @@ -26,7 +26,8 @@ import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.internal.util.worker.*; import org.apache.ignite.thread.*; -import java.util.*; +import org.jetbrains.annotations.*; +import org.jsr166.*; /** * Eagerly removes expired entries from cache when {@link org.apache.ignite.configuration.CacheConfiguration#isEagerTtl()} flag is set. @@ -34,14 +35,11 @@ import java.util.*; @SuppressWarnings("NakedNotify") public class GridCacheTtlManager extends GridCacheManagerAdapter { /** Entries pending removal. */ - private final GridConcurrentSkipListSet<EntryWrapper> pendingEntries = new GridConcurrentSkipListSet<>(); + private final GridConcurrentSkipListSetEx pendingEntries = new GridConcurrentSkipListSetEx(); /** Cleanup worker thread. */ private CleanupWorker cleanupWorker; - /** Sync mutex. */ - private final Object mux = new Object(); - /** {@inheritDoc} */ @Override protected void start0() throws IgniteCheckedException { if (cctx.kernalContext().isDaemon() || !cctx.config().isEagerTtl()) @@ -68,24 +66,13 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter { * @param entry Entry to add. */ public void addTrackedEntry(GridCacheMapEntry entry) { - EntryWrapper wrapper = new EntryWrapper(entry); - - pendingEntries.add(wrapper); - - // If entry is on the first position, notify waiting thread. - if (wrapper == pendingEntries.firstx()) { - synchronized (mux) { - mux.notifyAll(); - } - } + pendingEntries.add(new EntryWrapper(entry)); } /** * @param entry Entry to remove. */ public void removeTrackedEntry(GridCacheMapEntry entry) { - // Remove must be called while holding lock on entry before updating expire time. - // No need to wake up waiting thread in this case. pendingEntries.remove(new EntryWrapper(entry)); } @@ -97,6 +84,45 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter { } /** + * Expires entries by TTL. + * + * @param sizeLimited Size limited. + */ + public void expire(boolean sizeLimited) { + long now = U.currentTimeMillis(); + + GridCacheVersion obsoleteVer = null; + + int size = pendingEntries.sizex(); + + while (!sizeLimited || size-- > 0) { + EntryWrapper e = pendingEntries.pollFirst(); + + if (e == null) + break; + + if (e.expireTime > now) { + pendingEntries.add(e); + + break; + } + + if (obsoleteVer == null) + obsoleteVer = cctx.versions().next(); + + if (log.isDebugEnabled()) + log.debug("Trying to remove expired entry from cache: " + e); + + if (e.entry.onTtlExpired(obsoleteVer)) { + e.entry.context().cache().removeEntry(e.entry); + + if (e.entry.context().cache().configuration().isStatisticsEnabled()) + e.entry.context().cache().metrics0().onEvict(); + } + } + } + + /** * Entry cleanup worker. */ private class CleanupWorker extends GridWorker { @@ -110,52 +136,18 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter { /** {@inheritDoc} */ @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException { while (!isCancelled()) { - long now = U.currentTimeMillis(); - - GridCacheVersion obsoleteVer = null; - - for (Iterator<EntryWrapper> it = pendingEntries.iterator(); it.hasNext(); ) { - EntryWrapper wrapper = it.next(); - - if (wrapper.expireTime <= now) { - if (log.isDebugEnabled()) - log.debug("Trying to remove expired entry from cache: " + wrapper); - - if (obsoleteVer == null) - obsoleteVer = cctx.versions().next(); - - if (wrapper.entry.onTtlExpired(obsoleteVer)) - wrapper.entry.context().cache().removeEntry(wrapper.entry); + expire(false); - if (wrapper.entry.context().cache().configuration().isStatisticsEnabled()) - wrapper.entry.context().cache().metrics0().onEvict(); + EntryWrapper first = pendingEntries.firstx(); - it.remove(); - } - else - break; - } + if (first != null) { + long waitTime = first.expireTime - U.currentTimeMillis(); - synchronized (mux) { - while (true) { - // Access of the first element must be inside of - // synchronization block, so we don't miss out - // on thread notification events sent from - // 'addTrackedEntry(..)' method. - EntryWrapper first = pendingEntries.firstx(); - - if (first != null) { - long waitTime = first.expireTime - U.currentTimeMillis(); - - if (waitTime > 0) - mux.wait(waitTime); - else - break; - } - else - mux.wait(5000); - } + if (waitTime > 0) + U.sleep(waitTime); } + else + U.sleep(500); } } } @@ -214,4 +206,58 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter { return res; } } + + /** + * Provides additional method {@code #sizex()}. NOTE: Only the following methods supports this addition: + * <ul> + * <li>{@code #add()}</li> + * <li>{@code #remove()}</li> + * <li>{@code #pollFirst()}</li> + * <ul/> + */ + private static class GridConcurrentSkipListSetEx extends GridConcurrentSkipListSet<EntryWrapper> { + /** */ + private static final long serialVersionUID = 0L; + + /** Size. */ + private final LongAdder8 size = new LongAdder8(); + + /** + * @return Size based on performed operations. + */ + public int sizex() { + return size.intValue(); + } + + /** {@inheritDoc} */ + @Override public boolean add(EntryWrapper e) { + boolean res = super.add(e); + + assert res; + + size.increment(); + + return res; + } + + /** {@inheritDoc} */ + @Override public boolean remove(Object o) { + boolean res = super.remove(o); + + if (res) + size.decrement(); + + return res; + } + + /** {@inheritDoc} */ + @Nullable @Override public EntryWrapper pollFirst() { + EntryWrapper e = super.pollFirst(); + + if (e != null) + size.decrement(); + + return e; + } + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ef7d0114/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java index e7c7f9d..a0e45e7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java @@ -1157,6 +1157,8 @@ public class GridCacheUtils { if (ctx.isNear()) ctx.near().dht().context().evicts().unwind(); + + ctx.ttl().expire(true); } /** @@ -1166,11 +1168,12 @@ public class GridCacheUtils { assert ctx != null; for (GridCacheContext<K, V> cacheCtx : ctx.cacheContexts()) { - cacheCtx.evicts().unwind(); if (cacheCtx.isNear()) cacheCtx.near().dht().context().evicts().unwind(); + + cacheCtx.ttl().expire(true); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ef7d0114/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java index a873bb0..544fe6c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java @@ -77,7 +77,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb super.afterTest(); for (int i = 0; i < gridCount(); i++) { - GridContinuousProcessor proc = ((IgniteKernal)grid(i)).context().continuous(); + GridContinuousProcessor proc = grid(i).context().continuous(); ConcurrentMap<?, ?> syncMsgFuts = GridTestUtils.getFieldValue(proc, "syncMsgFuts"); @@ -712,7 +712,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb expirePlcCache.put(key, 10); - U.sleep(200); + U.sleep(500); if (!eagerTtl()) assertNull(primaryCache(key, cache.getName()).get(key)); // Provoke expire event if eager ttl is disabled.