Repository: incubator-ignite Updated Branches: refs/heads/ignite-45 8c082029a -> 489d1b834
IGNITE-45 - Fixed potential memory leak. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/489d1b83 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/489d1b83 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/489d1b83 Branch: refs/heads/ignite-45 Commit: 489d1b834ff7895951d0f96e549ca4ceab6791dd Parents: 8c08202 Author: Alexey Goncharuk <agoncha...@gridgain.com> Authored: Wed Mar 18 13:40:23 2015 -0700 Committer: Alexey Goncharuk <agoncha...@gridgain.com> Committed: Wed Mar 18 13:40:23 2015 -0700 ---------------------------------------------------------------------- .../affinity/GridAffinityAssignmentCache.java | 4 ++-- .../cache/GridCacheAffinityManager.java | 2 +- .../cache/GridCachePartitionExchangeManager.java | 19 +++++++++++++++++-- .../GridDhtPartitionsExchangeFuture.java | 9 +-------- .../query/GridCacheDistributedQueryManager.java | 18 ++++++++++++++---- .../cache/query/GridCacheQueryManager.java | 14 +++++++++++--- 6 files changed, 46 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/489d1b83/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java index 2ff00db..801e2a6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java @@ -227,13 +227,13 @@ public class GridAffinityAssignmentCache { * * @param topVer Actual topology version, older versions will be removed. */ - public void cleanUpCache(long topVer) { + public void cleanUpCache(AffinityTopologyVersion topVer) { if (log.isDebugEnabled()) log.debug("Cleaning up cache for version [locNodeId=" + ctx.localNodeId() + ", topVer=" + topVer + ']'); for (Iterator<AffinityTopologyVersion> it = affCache.keySet().iterator(); it.hasNext(); ) - if (it.next().topologyVersion() < topVer) + if (it.next().compareTo(topVer) < 0) it.remove(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/489d1b83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java index 73128d6..4b8b294 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java @@ -143,7 +143,7 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter { * * @param topVer Actual topology version, older versions will be removed. */ - public void cleanUpCache(long topVer) { + public void cleanUpCache(AffinityTopologyVersion topVer) { assert !cctx.isLocal(); aff.cleanUpCache(topVer); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/489d1b83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 78b009b..36bafdf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -58,6 +58,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana /** Exchange history size. */ private static final int EXCHANGE_HISTORY_SIZE = 1000; + /** Cleanup history size. */ + public static final int EXCH_FUT_CLEANUP_HISTORY_SIZE = 10; + /** Atomic reference for pending timeout object. */ private AtomicReference<ResendTimeoutObject> pendingResend = new AtomicReference<>(); @@ -638,12 +641,24 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana /** * @param exchFut Exchange. */ - public void onExchangeDone(GridDhtPartitionsExchangeFuture exchFut) { + public void onExchangeDone(GridDhtPartitionsExchangeFuture exchFut, Throwable err) { ExchangeFutureSet exchFuts0 = exchFuts; if (exchFuts0 != null) { + int skipped = 0; + for (GridDhtPartitionsExchangeFuture fut : exchFuts0.values()) { - if (fut.exchangeId().topologyVersion().topologyVersion() < exchFut.exchangeId().topologyVersion().topologyVersion() - 10) + skipped++; + + if (skipped == EXCH_FUT_CLEANUP_HISTORY_SIZE) { + for (GridCacheContext cacheCtx : cctx.cacheContexts()) { + if (err == null) { + if (!cacheCtx.isLocal()) + cacheCtx.affinity().cleanUpCache(fut.topologyVersion()); + } + } + } + if (skipped > 10) fut.cleanUp(); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/489d1b83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index dff7dd0..56af218 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -705,16 +705,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT /** {@inheritDoc} */ @Override public boolean onDone(AffinityTopologyVersion res, Throwable err) { - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - if (err == null) { - if (!cacheCtx.isLocal()) - cacheCtx.affinity().cleanUpCache(res.topologyVersion() - 10); - } - } - cctx.cache().onExchangeDone(exchId.topologyVersion(), reqs); - cctx.exchange().onExchangeDone(this); + cctx.exchange().onExchangeDone(this, err); if (super.onDone(res, err) && !dummy && !forcePreload) { if (log.isDebugEnabled()) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/489d1b83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java index d5a0608..27de8fc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java @@ -18,8 +18,6 @@ package org.apache.ignite.internal.processors.cache.query; import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.cache.query.*; import org.apache.ignite.cluster.*; import org.apache.ignite.events.*; import org.apache.ignite.internal.*; @@ -77,6 +75,9 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage } }; + /** Event listener. */ + private GridLocalEventListener lsnr; + /** {@inheritDoc} */ @Override public void start0() throws IgniteCheckedException { super.start0(); @@ -89,14 +90,23 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage } }); - cctx.events().addListener(new GridLocalEventListener() { + lsnr = new GridLocalEventListener() { @Override public void onEvent(Event evt) { DiscoveryEvent discoEvt = (DiscoveryEvent)evt; for (GridCacheDistributedQueryFuture fut : futs.values()) fut.onNodeLeft(discoEvt.eventNode().id()); } - }, EVT_NODE_LEFT, EVT_NODE_FAILED); + }; + + cctx.events().addListener(lsnr, EVT_NODE_LEFT, EVT_NODE_FAILED); + } + + /** {@inheritDoc} */ + @Override protected void onKernalStop0(boolean cancel) { + super.onKernalStop0(cancel); + + cctx.events().removeListener(lsnr); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/489d1b83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index d381166..3d05b1a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -88,13 +88,16 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte /** */ private final GridSpinBusyLock busyLock = new GridSpinBusyLock(); + /** Event listener. */ + private GridLocalEventListener lsnr; + /** {@inheritDoc} */ @Override public void start0() throws IgniteCheckedException { qryProc = cctx.kernalContext().query(); space = cctx.name(); maxIterCnt = MAX_ITERATORS; - cctx.events().addListener(new GridLocalEventListener() { + lsnr = new GridLocalEventListener() { @Override public void onEvent(Event evt) { UUID nodeId = ((DiscoveryEvent)evt).eventNode().id(); @@ -106,7 +109,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte entry.getValue().listen(new CIX1<IgniteInternalFuture<QueryResult<K, V>>>() { @Override - public void applyx(IgniteInternalFuture<QueryResult<K, V>> f) throws IgniteCheckedException { + public void applyx(IgniteInternalFuture<QueryResult<K, V>> f) + throws IgniteCheckedException { f.get().closeIfNotShared(recipient); } }); @@ -128,13 +132,17 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte } } } - }, EVT_NODE_LEFT, EVT_NODE_FAILED); + }; + + cctx.events().addListener(lsnr, EVT_NODE_LEFT, EVT_NODE_FAILED); } /** {@inheritDoc} */ @Override protected void onKernalStop0(boolean cancel) { busyLock.block(); + cctx.events().removeListener(lsnr); + if (cancel) onCancelAtStop(); else