#ignite-614: wip.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/dcac35ce Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/dcac35ce Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/dcac35ce Branch: refs/heads/ignite-614 Commit: dcac35ce2662ef82eb62265f2b5ed7dd19d9ca46 Parents: 87c1275 Author: ivasilinets <ivasilin...@gridgain.com> Authored: Thu May 14 16:08:12 2015 +0300 Committer: ivasilinets <ivasilin...@gridgain.com> Committed: Thu May 14 16:08:12 2015 +0300 ---------------------------------------------------------------------- .../GridCacheEntryInfoCollectSwapListener.java | 68 +++++++++++--------- .../processors/cache/GridCacheSwapListener.java | 4 +- .../processors/cache/GridCacheSwapManager.java | 11 ++-- .../preloader/GridDhtPartitionSupplyPool.java | 1 + 4 files changed, 46 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dcac35ce/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfoCollectSwapListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfoCollectSwapListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfoCollectSwapListener.java index bd1746d..9a7511c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfoCollectSwapListener.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfoCollectSwapListener.java @@ -18,6 +18,8 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.client.util.*; import org.jsr166.*; import java.util.*; @@ -30,11 +32,14 @@ public class GridCacheEntryInfoCollectSwapListener implements GridCacheSwapListe /** */ private final Map<KeyCacheObject, GridCacheEntryInfo> swappedEntries = new ConcurrentHashMap8<>(); - private final ConcurrentHashMap8<KeyCacheObject, GridCacheEntryInfo> notFinishedSwappedEntries = new ConcurrentHashMap8<>(); + /** Entries in swapping. */ + private final GridConcurrentHashSet<KeyCacheObject> swappingKeys = new GridConcurrentHashSet(); + /** Lock for empty condition. */ + final Lock emptyLock = new ReentrantLock(); - final Lock lock = new ReentrantLock(); - final Condition emptyCond = lock.newCondition(); + /** Condition for empty swapping entries. */ + final Condition emptyCond = emptyLock.newCondition(); /** */ private final IgniteLogger log; @@ -49,27 +54,37 @@ public class GridCacheEntryInfoCollectSwapListener implements GridCacheSwapListe /** * Wait until all entries finish unswapping. */ - public void waitUnswapFinished() { - lock.lock(); - try{ - if (notFinishedSwappedEntries.size() != 0) - try { - emptyCond.await(); - } - catch (InterruptedException e) { - // No-op. - } - } finally { - lock.unlock(); + public void waitUnswapFinished() throws IgniteCheckedException { + emptyLock.lock(); + + try { + if (swappingKeys.size() != 0) + emptyCond.await(); + } + catch (InterruptedException e) { + throw new IgniteInterruptedCheckedException(e); + } + finally { + emptyLock.unlock(); } } /** {@inheritDoc} */ - @Override public void onEntryUnswapping(int part, KeyCacheObject key, GridCacheSwapEntry swapEntry) throws IgniteCheckedException { + @Override public void onEntryUnswapping(KeyCacheObject key) throws IgniteCheckedException { if (log.isDebugEnabled()) - log.debug("Received unswapped event for key: " + key); + log.debug("Received unswapping event for key: " + key); assert key != null; + + swappingKeys.add(key); + } + + /** {@inheritDoc} */ + @Override public void onEntryUnswapped(int part, + KeyCacheObject key, + GridCacheSwapEntry swapEntry) + { + assert key != null; assert swapEntry != null; GridCacheEntryInfo info = new GridCacheEntryInfo(); @@ -80,26 +95,17 @@ public class GridCacheEntryInfoCollectSwapListener implements GridCacheSwapListe info.version(swapEntry.version()); info.value(swapEntry.value()); - notFinishedSwappedEntries.put(key, info); - } - - /** {@inheritDoc} */ - @Override public void onEntryUnswapped(int part, - KeyCacheObject key, - GridCacheSwapEntry swapEntry) - { - GridCacheEntryInfo info = notFinishedSwappedEntries.remove(key); + swappedEntries.put(key, info); - assert info != null; + swappingKeys.remove(key); - swappedEntries.put(key, info); + emptyLock.lock(); - lock.lock(); try{ - if (notFinishedSwappedEntries.size() == 0) + if (swappingKeys.size() == 0) emptyCond.signalAll(); } finally { - lock.unlock(); + emptyLock.unlock(); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dcac35ce/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapListener.java index d8d0ddc..d6d13ff 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapListener.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapListener.java @@ -24,12 +24,10 @@ import org.apache.ignite.*; */ public interface GridCacheSwapListener { /** - * @param part Partition. * @param key Cache key. - * @param e Entry. * @throws IgniteCheckedException If failed. */ - public void onEntryUnswapping(int part, KeyCacheObject key, GridCacheSwapEntry e) throws IgniteCheckedException; + public void onEntryUnswapping(KeyCacheObject key) throws IgniteCheckedException; /** * @param part Partition. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dcac35ce/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java index fed83de..c179b31 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java @@ -580,8 +580,12 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { part, key.valueBytes(cctx.cacheObjectContext())); - for (GridCacheSwapListener lsnr : swapLsnrs.get(part)) - lsnr.onEntryUnswapping(part, key, entry); + Collection<GridCacheSwapListener> lsnrs = swapLsnrs.get(part); + + if (lsnrs != null) { + for (GridCacheSwapListener lsnr : lsnrs) + lsnr.onEntryUnswapping(key); + } swapMgr.remove(spaceName, swapKey, new CI1<byte[]>() { @Override public void apply(byte[] rmv) { @@ -595,7 +599,6 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { t.set(entry); CacheObject v = entry.value(); - byte[] valBytes = entry.valueBytes(); // Event notification. if (cctx.events().isRecordable(EVT_CACHE_OBJECT_UNSWAPPED)) { @@ -1928,7 +1931,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { } /** {@inheritDoc} */ - @Override public void onEntryUnswapping(int part, KeyCacheObject key, GridCacheSwapEntry e) + @Override public void onEntryUnswapping(KeyCacheObject key) throws IgniteCheckedException { // No-op. } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dcac35ce/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java index 39fd9ac..3b93c09 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java @@ -411,6 +411,7 @@ class GridDhtPartitionSupplyPool<K, V> { // Stop receiving promote notifications. if (swapLsnr != null) { swapLsnr.waitUnswapFinished(); + cctx.swap().removeOffHeapListener(part, swapLsnr); cctx.swap().removeSwapListener(part, swapLsnr); }