#IGNITE-86: wip
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/42d94fe2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/42d94fe2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/42d94fe2 Branch: refs/heads/ignite-sql Commit: 42d94fe229477d9048d9b389a2472ff6f6ff2596 Parents: 5819493 Author: ivasilinets <ivasilin...@gridgain.com> Authored: Fri Jan 23 15:48:07 2015 +0300 Committer: ivasilinets <ivasilin...@gridgain.com> Committed: Fri Jan 23 15:48:07 2015 +0300 ---------------------------------------------------------------------- .../cache/GridCacheDeploymentManager.java | 141 ++++++++++--------- .../cache/GridCachePreloaderAdapter.java | 2 +- .../preloader/GridDhtPartitionDemandPool.java | 2 +- 3 files changed, 74 insertions(+), 71 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/42d94fe2/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheDeploymentManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheDeploymentManager.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheDeploymentManager.java index 39d998e..5d9759f 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheDeploymentManager.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheDeploymentManager.java @@ -53,7 +53,8 @@ public class GridCacheDeploymentManager<K, V> extends GridCacheSharedManagerAdap private volatile ClassLoader globalLdr; /** Undeploys. */ - private final ConcurrentLinkedQueue<CA> undeploys = new ConcurrentLinkedQueue<>(); + private final ConcurrentHashMap8<GridCacheContext, ConcurrentLinkedQueue<CA>> undeploys + = new ConcurrentHashMap8<>(); /** Per-thread deployment context. */ private ConcurrentMap<IgniteUuid, CachedDeploymentInfo<K, V>> deps = @@ -178,10 +179,10 @@ public class GridCacheDeploymentManager<K, V> extends GridCacheSharedManagerAdap /** * Undeploy all queued up closures. */ - public void unwind() { + public void unwind(GridCacheContext ctx) { int cnt = 0; - for (CA c = undeploys.poll(); c != null; c = undeploys.poll()) { + for (CA c = undeploys.get(ctx).poll(); c != null; c = undeploys.get(ctx).poll()) { c.apply(); cnt++; @@ -202,11 +203,16 @@ public class GridCacheDeploymentManager<K, V> extends GridCacheSharedManagerAdap if (log.isDebugEnabled()) log.debug("Received onUndeploy() request [ldr=" + ldr + ", cctx=" + cctx + ']'); - undeploys.add(new CA() { - @Override public void apply() { - onUndeploy0(ldr); - } - }); + for (final GridCacheContext<K, V> cacheCtx : cctx.cacheContexts()) { + undeploys.putIfAbsent(cacheCtx, new ConcurrentLinkedQueue<CA>()); + + undeploys.get(cacheCtx).add(new CA() { + @Override + public void apply() { + onUndeploy0(ldr, cacheCtx); + } + }); + } for (GridCacheContext<K, V> cacheCtx : cctx.cacheContexts()) { // Unwind immediately for local and replicate caches. @@ -219,86 +225,83 @@ public class GridCacheDeploymentManager<K, V> extends GridCacheSharedManagerAdap /** * @param ldr Loader. */ - private void onUndeploy0(final ClassLoader ldr) { - for (final GridCacheContext<K, V> cacheCtx : cctx.cacheContexts()) { - GridCacheAdapter<K, V> cache = cacheCtx.cache(); - - Set<K> keySet = cache.keySet(cacheCtx.vararg( - new P1<GridCacheEntry<K, V>>() { - @Override public boolean apply(GridCacheEntry<K, V> e) { - return cacheCtx.isNear() ? undeploy(e, cacheCtx.near()) || undeploy(e, cacheCtx.near().dht()) : - undeploy(e, cacheCtx.cache()); - } - - /** - * @param e Entry. - * @param cache Cache. - * @return {@code True} if entry should be undeployed. - */ - private boolean undeploy(GridCacheEntry<K, V> e, GridCacheAdapter<K, V> cache) { - K k = e.getKey(); + private void onUndeploy0(final ClassLoader ldr, final GridCacheContext<K, V> cacheCtx) { + GridCacheAdapter<K, V> cache = cacheCtx.cache(); + + Set<K> keySet = cache.keySet(cacheCtx.vararg( + new P1<GridCacheEntry<K, V>>() { + @Override public boolean apply(GridCacheEntry<K, V> e) { + return cacheCtx.isNear() ? undeploy(e, cacheCtx.near()) || undeploy(e, cacheCtx.near().dht()) : + undeploy(e, cacheCtx.cache()); + } - GridCacheEntryEx<K, V> entry = cache.peekEx(e.getKey()); + /** + * @param e Entry. + * @param cache Cache. + * @return {@code True} if entry should be undeployed. + */ + private boolean undeploy(GridCacheEntry<K, V> e, GridCacheAdapter<K, V> cache) { + K k = e.getKey(); - if (entry == null) - return false; + GridCacheEntryEx<K, V> entry = cache.peekEx(e.getKey()); - V v; + if (entry == null) + return false; - try { - v = entry.peek(GridCachePeekMode.GLOBAL, CU.<K, V>empty()); - } - catch (GridCacheEntryRemovedException ignore) { - return false; - } - catch (IgniteException ignore) { - // Peek can throw runtime exception if unmarshalling failed. - return true; - } + V v; - assert k != null : "Key cannot be null for cache entry: " + e; + try { + v = entry.peek(GridCachePeekMode.GLOBAL, CU.<K, V>empty()); + } + catch (GridCacheEntryRemovedException ignore) { + return false; + } + catch (IgniteException ignore) { + // Peek can throw runtime exception if unmarshalling failed. + return true; + } - ClassLoader keyLdr = U.detectObjectClassLoader(k); - ClassLoader valLdr = U.detectObjectClassLoader(v); + assert k != null : "Key cannot be null for cache entry: " + e; - boolean res = F.eq(ldr, keyLdr) || F.eq(ldr, valLdr); + ClassLoader keyLdr = U.detectObjectClassLoader(k); + ClassLoader valLdr = U.detectObjectClassLoader(v); - if (log.isDebugEnabled()) - log.debug("Finished examining entry [entryCls=" + e.getClass() + - ", key=" + k + ", keyCls=" + k.getClass() + - ", valCls=" + (v != null ? v.getClass() : "null") + - ", keyLdr=" + keyLdr + ", valLdr=" + valLdr + ", res=" + res + ']'); + boolean res = F.eq(ldr, keyLdr) || F.eq(ldr, valLdr); - return res; - } - })); + if (log.isDebugEnabled()) + log.debug("Finished examining entry [entryCls=" + e.getClass() + + ", key=" + k + ", keyCls=" + k.getClass() + + ", valCls=" + (v != null ? v.getClass() : "null") + + ", keyLdr=" + keyLdr + ", valLdr=" + valLdr + ", res=" + res + ']'); - Collection<K> keys = new ArrayList<>(); + return res; + } + })); - for (K k : keySet) - keys.add(k); + Collection<K> keys = new ArrayList<>(); - if (log.isDebugEnabled()) - log.debug("Finished searching keys for undeploy [keysCnt=" + keys.size() + ']'); + for (K k : keySet) + keys.add(k); - cache.clearAll(keys, true); + if (log.isDebugEnabled()) + log.debug("Finished searching keys for undeploy [keysCnt=" + keys.size() + ']'); - if (cacheCtx.isNear()) - cacheCtx.near().dht().clearAll(keys, true); + cache.clearAll(keys, true); - GridCacheQueryManager<K, V> qryMgr = cacheCtx.queries(); + if (cacheCtx.isNear()) + cacheCtx.near().dht().clearAll(keys, true); - if (qryMgr != null) - qryMgr.onUndeploy(ldr); + GridCacheQueryManager<K, V> qryMgr = cacheCtx.queries(); - // Examine swap for entries to undeploy. - int swapUndeployCnt = cacheCtx.isNear() ? - cacheCtx.near().dht().context().swap().onUndeploy(ldr) : - cacheCtx.swap().onUndeploy(ldr); + if (qryMgr != null) + qryMgr.onUndeploy(ldr); - if (cacheCtx.system()) - continue; + // Examine swap for entries to undeploy. + int swapUndeployCnt = cacheCtx.isNear() ? + cacheCtx.near().dht().context().swap().onUndeploy(ldr) : + cacheCtx.swap().onUndeploy(ldr); + if (!cacheCtx.system()) { U.quietAndWarn(log, ""); U.quietAndWarn( log, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/42d94fe2/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCachePreloaderAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCachePreloaderAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCachePreloaderAdapter.java index 950092e..e574ab6 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCachePreloaderAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCachePreloaderAdapter.java @@ -107,7 +107,7 @@ public class GridCachePreloaderAdapter<K, V> implements GridCachePreloader<K, V> /** {@inheritDoc} */ @Override public void unwindUndeploys() { - cctx.deploy().unwind(); + cctx.deploy().unwind(cctx); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/42d94fe2/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java index 7bbf102..f347450 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java @@ -391,7 +391,7 @@ public class GridDhtPartitionDemandPool<K, V> { demandLock.writeLock().lock(); try { - cctx.deploy().unwind(); + cctx.deploy().unwind(cctx); } finally { demandLock.writeLock().unlock();