#ignite-614: wip.

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/dcac35ce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/dcac35ce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/dcac35ce

Branch: refs/heads/ignite-614
Commit: dcac35ce2662ef82eb62265f2b5ed7dd19d9ca46
Parents: 87c1275
Author: ivasilinets <ivasilin...@gridgain.com>
Authored: Thu May 14 16:08:12 2015 +0300
Committer: ivasilinets <ivasilin...@gridgain.com>
Committed: Thu May 14 16:08:12 2015 +0300

----------------------------------------------------------------------
 .../GridCacheEntryInfoCollectSwapListener.java  | 68 +++++++++++---------
 .../processors/cache/GridCacheSwapListener.java |  4 +-
 .../processors/cache/GridCacheSwapManager.java  | 11 ++--
 .../preloader/GridDhtPartitionSupplyPool.java   |  1 +
 4 files changed, 46 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dcac35ce/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfoCollectSwapListener.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfoCollectSwapListener.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfoCollectSwapListener.java
index bd1746d..9a7511c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfoCollectSwapListener.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfoCollectSwapListener.java
@@ -18,6 +18,8 @@
 package org.apache.ignite.internal.processors.cache;
 
 import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.client.util.*;
 import org.jsr166.*;
 
 import java.util.*;
@@ -30,11 +32,14 @@ public class GridCacheEntryInfoCollectSwapListener 
implements GridCacheSwapListe
     /** */
     private final Map<KeyCacheObject, GridCacheEntryInfo> swappedEntries = new 
ConcurrentHashMap8<>();
 
-    private final ConcurrentHashMap8<KeyCacheObject, GridCacheEntryInfo>  
notFinishedSwappedEntries = new ConcurrentHashMap8<>();
+    /** Entries in swapping. */
+    private final GridConcurrentHashSet<KeyCacheObject> swappingKeys = new 
GridConcurrentHashSet();
 
+    /** Lock for empty condition. */
+    final Lock emptyLock = new ReentrantLock();
 
-    final Lock lock = new ReentrantLock();
-    final Condition emptyCond  = lock.newCondition();
+    /** Condition for empty swapping entries. */
+    final Condition emptyCond  = emptyLock.newCondition();
 
     /** */
     private final IgniteLogger log;
@@ -49,27 +54,37 @@ public class GridCacheEntryInfoCollectSwapListener 
implements GridCacheSwapListe
     /**
      * Wait until all entries finish unswapping.
      */
-    public void waitUnswapFinished() {
-        lock.lock();
-        try{
-            if (notFinishedSwappedEntries.size() != 0)
-                try {
-                    emptyCond.await();
-                }
-                catch (InterruptedException e) {
-                    // No-op.
-                }
-        } finally {
-            lock.unlock();
+    public void waitUnswapFinished() throws IgniteCheckedException {
+        emptyLock.lock();
+
+        try {
+            if (swappingKeys.size() != 0)
+                emptyCond.await();
+        }
+        catch (InterruptedException e) {
+            throw new IgniteInterruptedCheckedException(e);
+        }
+        finally {
+            emptyLock.unlock();
         }
     }
 
     /** {@inheritDoc} */
-    @Override public void onEntryUnswapping(int part, KeyCacheObject key, 
GridCacheSwapEntry swapEntry) throws IgniteCheckedException {
+    @Override public void onEntryUnswapping(KeyCacheObject key) throws 
IgniteCheckedException {
         if (log.isDebugEnabled())
-            log.debug("Received unswapped event for key: " + key);
+            log.debug("Received unswapping event for key: " + key);
 
         assert key != null;
+
+        swappingKeys.add(key);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onEntryUnswapped(int part,
+        KeyCacheObject key,
+        GridCacheSwapEntry swapEntry)
+    {
+        assert key != null;
         assert swapEntry != null;
 
         GridCacheEntryInfo info = new GridCacheEntryInfo();
@@ -80,26 +95,17 @@ public class GridCacheEntryInfoCollectSwapListener 
implements GridCacheSwapListe
         info.version(swapEntry.version());
         info.value(swapEntry.value());
 
-        notFinishedSwappedEntries.put(key, info);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onEntryUnswapped(int part,
-        KeyCacheObject key,
-        GridCacheSwapEntry swapEntry)
-    {
-        GridCacheEntryInfo info = notFinishedSwappedEntries.remove(key);
+        swappedEntries.put(key, info);
 
-        assert info != null;
+        swappingKeys.remove(key);
 
-        swappedEntries.put(key, info);
+        emptyLock.lock();
 
-        lock.lock();
         try{
-            if (notFinishedSwappedEntries.size() == 0)
+            if (swappingKeys.size() == 0)
                 emptyCond.signalAll();
         } finally {
-            lock.unlock();
+            emptyLock.unlock();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dcac35ce/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapListener.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapListener.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapListener.java
index d8d0ddc..d6d13ff 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapListener.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapListener.java
@@ -24,12 +24,10 @@ import org.apache.ignite.*;
  */
 public interface GridCacheSwapListener {
     /**
-     * @param part Partition.
      * @param key Cache key.
-     * @param e Entry.
      * @throws IgniteCheckedException If failed.
      */
-    public void onEntryUnswapping(int part, KeyCacheObject key, 
GridCacheSwapEntry e) throws IgniteCheckedException;
+    public void onEntryUnswapping(KeyCacheObject key) throws 
IgniteCheckedException;
 
     /**
      * @param part Partition.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dcac35ce/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
index fed83de..c179b31 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
@@ -580,8 +580,12 @@ public class GridCacheSwapManager extends 
GridCacheManagerAdapter {
             part,
             key.valueBytes(cctx.cacheObjectContext()));
 
-        for (GridCacheSwapListener lsnr : swapLsnrs.get(part))
-            lsnr.onEntryUnswapping(part, key, entry);
+        Collection<GridCacheSwapListener> lsnrs = swapLsnrs.get(part);
+
+        if (lsnrs != null) {
+            for (GridCacheSwapListener lsnr : lsnrs)
+                lsnr.onEntryUnswapping(key);
+        }
 
         swapMgr.remove(spaceName, swapKey,  new CI1<byte[]>() {
             @Override public void apply(byte[] rmv) {
@@ -595,7 +599,6 @@ public class GridCacheSwapManager extends 
GridCacheManagerAdapter {
                         t.set(entry);
 
                         CacheObject v = entry.value();
-                        byte[] valBytes = entry.valueBytes();
 
                         // Event notification.
                         if 
(cctx.events().isRecordable(EVT_CACHE_OBJECT_UNSWAPPED)) {
@@ -1928,7 +1931,7 @@ public class GridCacheSwapManager extends 
GridCacheManagerAdapter {
         }
 
         /** {@inheritDoc} */
-        @Override public void onEntryUnswapping(int part, KeyCacheObject key, 
GridCacheSwapEntry e)
+        @Override public void onEntryUnswapping(KeyCacheObject key)
             throws IgniteCheckedException {
             // No-op.
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dcac35ce/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
index 39fd9ac..3b93c09 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
@@ -411,6 +411,7 @@ class GridDhtPartitionSupplyPool<K, V> {
                         // Stop receiving promote notifications.
                         if (swapLsnr != null) {
                             swapLsnr.waitUnswapFinished();
+
                             cctx.swap().removeOffHeapListener(part, swapLsnr);
                             cctx.swap().removeSwapListener(part, swapLsnr);
                         }

Reply via email to