Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-45 8c082029a -> 489d1b834


IGNITE-45 - Fixed potential memory leak.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/489d1b83
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/489d1b83
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/489d1b83

Branch: refs/heads/ignite-45
Commit: 489d1b834ff7895951d0f96e549ca4ceab6791dd
Parents: 8c08202
Author: Alexey Goncharuk <agoncha...@gridgain.com>
Authored: Wed Mar 18 13:40:23 2015 -0700
Committer: Alexey Goncharuk <agoncha...@gridgain.com>
Committed: Wed Mar 18 13:40:23 2015 -0700

----------------------------------------------------------------------
 .../affinity/GridAffinityAssignmentCache.java    |  4 ++--
 .../cache/GridCacheAffinityManager.java          |  2 +-
 .../cache/GridCachePartitionExchangeManager.java | 19 +++++++++++++++++--
 .../GridDhtPartitionsExchangeFuture.java         |  9 +--------
 .../query/GridCacheDistributedQueryManager.java  | 18 ++++++++++++++----
 .../cache/query/GridCacheQueryManager.java       | 14 +++++++++++---
 6 files changed, 46 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/489d1b83/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
index 2ff00db..801e2a6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
@@ -227,13 +227,13 @@ public class GridAffinityAssignmentCache {
      *
      * @param topVer Actual topology version, older versions will be removed.
      */
-    public void cleanUpCache(long topVer) {
+    public void cleanUpCache(AffinityTopologyVersion topVer) {
         if (log.isDebugEnabled())
             log.debug("Cleaning up cache for version [locNodeId=" + 
ctx.localNodeId() +
                 ", topVer=" + topVer + ']');
 
         for (Iterator<AffinityTopologyVersion> it = 
affCache.keySet().iterator(); it.hasNext(); )
-            if (it.next().topologyVersion() < topVer)
+            if (it.next().compareTo(topVer) < 0)
                 it.remove();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/489d1b83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
index 73128d6..4b8b294 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
@@ -143,7 +143,7 @@ public class GridCacheAffinityManager extends 
GridCacheManagerAdapter {
      *
      * @param topVer Actual topology version, older versions will be removed.
      */
-    public void cleanUpCache(long topVer) {
+    public void cleanUpCache(AffinityTopologyVersion topVer) {
         assert !cctx.isLocal();
 
         aff.cleanUpCache(topVer);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/489d1b83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 78b009b..36bafdf 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -58,6 +58,9 @@ public class GridCachePartitionExchangeManager<K, V> extends 
GridCacheSharedMana
     /** Exchange history size. */
     private static final int EXCHANGE_HISTORY_SIZE = 1000;
 
+    /** Cleanup history size. */
+    public static final int EXCH_FUT_CLEANUP_HISTORY_SIZE = 10;
+
     /** Atomic reference for pending timeout object. */
     private AtomicReference<ResendTimeoutObject> pendingResend = new 
AtomicReference<>();
 
@@ -638,12 +641,24 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
     /**
      * @param exchFut Exchange.
      */
-    public void onExchangeDone(GridDhtPartitionsExchangeFuture exchFut) {
+    public void onExchangeDone(GridDhtPartitionsExchangeFuture exchFut, 
Throwable err) {
         ExchangeFutureSet exchFuts0 = exchFuts;
 
         if (exchFuts0 != null) {
+            int skipped = 0;
+
             for (GridDhtPartitionsExchangeFuture fut : exchFuts0.values()) {
-                if (fut.exchangeId().topologyVersion().topologyVersion() < 
exchFut.exchangeId().topologyVersion().topologyVersion() - 10)
+                skipped++;
+
+                if (skipped == EXCH_FUT_CLEANUP_HISTORY_SIZE) {
+                    for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+                        if (err == null) {
+                            if (!cacheCtx.isLocal())
+                                
cacheCtx.affinity().cleanUpCache(fut.topologyVersion());
+                        }
+                    }
+                }
+                if (skipped > 10)
                     fut.cleanUp();
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/489d1b83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index dff7dd0..56af218 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -705,16 +705,9 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
 
     /** {@inheritDoc} */
     @Override public boolean onDone(AffinityTopologyVersion res, Throwable 
err) {
-        for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-            if (err == null) {
-                if (!cacheCtx.isLocal())
-                    cacheCtx.affinity().cleanUpCache(res.topologyVersion() - 
10);
-            }
-        }
-
         cctx.cache().onExchangeDone(exchId.topologyVersion(), reqs);
 
-        cctx.exchange().onExchangeDone(this);
+        cctx.exchange().onExchangeDone(this, err);
 
         if (super.onDone(res, err) && !dummy && !forcePreload) {
             if (log.isDebugEnabled())

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/489d1b83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
index d5a0608..27de8fc 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
@@ -18,8 +18,6 @@
 package org.apache.ignite.internal.processors.cache.query;
 
 import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
-import org.apache.ignite.cache.query.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.events.*;
 import org.apache.ignite.internal.*;
@@ -77,6 +75,9 @@ public class GridCacheDistributedQueryManager<K, V> extends 
GridCacheQueryManage
         }
     };
 
+    /** Event listener. */
+    private GridLocalEventListener lsnr;
+
     /** {@inheritDoc} */
     @Override public void start0() throws IgniteCheckedException {
         super.start0();
@@ -89,14 +90,23 @@ public class GridCacheDistributedQueryManager<K, V> extends 
GridCacheQueryManage
             }
         });
 
-        cctx.events().addListener(new GridLocalEventListener() {
+        lsnr = new GridLocalEventListener() {
             @Override public void onEvent(Event evt) {
                 DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
 
                 for (GridCacheDistributedQueryFuture fut : futs.values())
                     fut.onNodeLeft(discoEvt.eventNode().id());
             }
-        }, EVT_NODE_LEFT, EVT_NODE_FAILED);
+        };
+
+        cctx.events().addListener(lsnr, EVT_NODE_LEFT, EVT_NODE_FAILED);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void onKernalStop0(boolean cancel) {
+        super.onKernalStop0(cancel);
+
+        cctx.events().removeListener(lsnr);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/489d1b83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index d381166..3d05b1a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -88,13 +88,16 @@ public abstract class GridCacheQueryManager<K, V> extends 
GridCacheManagerAdapte
     /** */
     private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
 
+    /** Event listener. */
+    private GridLocalEventListener lsnr;
+
     /** {@inheritDoc} */
     @Override public void start0() throws IgniteCheckedException {
         qryProc = cctx.kernalContext().query();
         space = cctx.name();
         maxIterCnt = MAX_ITERATORS;
 
-        cctx.events().addListener(new GridLocalEventListener() {
+        lsnr = new GridLocalEventListener() {
             @Override public void onEvent(Event evt) {
                 UUID nodeId = ((DiscoveryEvent)evt).eventNode().id();
 
@@ -106,7 +109,8 @@ public abstract class GridCacheQueryManager<K, V> extends 
GridCacheManagerAdapte
 
                         entry.getValue().listen(new 
CIX1<IgniteInternalFuture<QueryResult<K, V>>>() {
                             @Override
-                            public void 
applyx(IgniteInternalFuture<QueryResult<K, V>> f) throws IgniteCheckedException 
{
+                            public void 
applyx(IgniteInternalFuture<QueryResult<K, V>> f)
+                                throws IgniteCheckedException {
                                 f.get().closeIfNotShared(recipient);
                             }
                         });
@@ -128,13 +132,17 @@ public abstract class GridCacheQueryManager<K, V> extends 
GridCacheManagerAdapte
                     }
                 }
             }
-        }, EVT_NODE_LEFT, EVT_NODE_FAILED);
+        };
+
+        cctx.events().addListener(lsnr, EVT_NODE_LEFT, EVT_NODE_FAILED);
     }
 
     /** {@inheritDoc} */
     @Override protected void onKernalStop0(boolean cancel) {
         busyLock.block();
 
+        cctx.events().removeListener(lsnr);
+
         if (cancel)
             onCancelAtStop();
         else

Reply via email to