#IGNITE-86: merge
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/7cccef78 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/7cccef78 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/7cccef78 Branch: refs/heads/ingite-9655-merge Commit: 7cccef78212abde41e57bf3b323647e86d5dceba Parents: 117037d Author: ivasilinets <ivasilin...@gridgain.com> Authored: Fri Jan 23 19:28:24 2015 +0300 Committer: ivasilinets <ivasilin...@gridgain.com> Committed: Fri Jan 23 19:28:24 2015 +0300 ---------------------------------------------------------------------- .../GridDeploymentPerLoaderStore.java | 20 ++- .../GridDeploymentPerVersionStore.java | 4 +- .../processors/cache/GridCacheAdapter.java | 5 +- .../cache/GridCacheDeploymentManager.java | 150 ++++++++++--------- .../cache/GridCachePreloaderAdapter.java | 2 +- .../processors/cache/GridCacheProcessor.java | 5 +- .../processors/cache/GridCacheSwapManager.java | 3 +- .../preloader/GridDhtPartitionDemandPool.java | 2 +- .../streamer/GridStreamProcessor.java | 5 +- .../processors/streamer/IgniteStreamerEx.java | 3 +- .../processors/streamer/IgniteStreamerImpl.java | 5 +- 11 files changed, 103 insertions(+), 101 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7cccef78/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentPerLoaderStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentPerLoaderStore.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentPerLoaderStore.java index 9ea3fd3..aaf0b5d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentPerLoaderStore.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentPerLoaderStore.java @@ -91,7 +91,7 @@ public class GridDeploymentPerLoaderStore extends GridDeploymentStoreAdapter { } for (IsolatedDeployment dep : rmv) - dep.recordUndeployed(nodeId); + dep.recordUndeployed(); } } }; @@ -122,7 +122,7 @@ public class GridDeploymentPerLoaderStore extends GridDeploymentStoreAdapter { } for (IsolatedDeployment dep : cp) - dep.recordUndeployed(null); + dep.recordUndeployed(); if (log.isDebugEnabled()) log.debug(stopInfo()); @@ -150,7 +150,7 @@ public class GridDeploymentPerLoaderStore extends GridDeploymentStoreAdapter { } for (IsolatedDeployment dep : rmv) - dep.recordUndeployed(null); + dep.recordUndeployed(); if (log.isDebugEnabled()) log.debug("Registered deployment discovery listener: " + discoLsnr); @@ -351,7 +351,7 @@ public class GridDeploymentPerLoaderStore extends GridDeploymentStoreAdapter { } if (rmv) - dep.recordUndeployed(null); + dep.recordUndeployed(); } }); } @@ -384,7 +384,7 @@ public class GridDeploymentPerLoaderStore extends GridDeploymentStoreAdapter { } for (IsolatedDeployment dep : undeployed) - dep.recordUndeployed(null); + dep.recordUndeployed(); } /** {@inheritDoc} */ @@ -461,11 +461,9 @@ public class GridDeploymentPerLoaderStore extends GridDeploymentStoreAdapter { } /** - * Called to record all undeployed classes.. - * - * @param leftNodeId Left node ID. + * Called to record all undeployed classes. */ - void recordUndeployed(@Nullable UUID leftNodeId) { + void recordUndeployed() { assert !Thread.holdsLock(mux); GridEventStorageManager evts = ctx.event(); @@ -499,8 +497,8 @@ public class GridDeploymentPerLoaderStore extends GridDeploymentStoreAdapter { ClassLoader ldr = classLoader(); - ctx.cache().onUndeployed(leftNodeId, ldr); - ctx.stream().onUndeployed(leftNodeId, ldr); + ctx.cache().onUndeployed(ldr); + ctx.stream().onUndeployed(ldr); // Clear optimized marshaller's cache. If another marshaller is used, this is no-op. IgniteOptimizedMarshaller.onUndeploy(ldr); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7cccef78/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentPerVersionStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentPerVersionStore.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentPerVersionStore.java index a37c02c..b615e74 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentPerVersionStore.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentPerVersionStore.java @@ -1259,8 +1259,8 @@ public class GridDeploymentPerVersionStore extends GridDeploymentStoreAdapter { ClassLoader ldr = classLoader(); - ctx.cache().onUndeployed(leftNodeId, ldr); - ctx.stream().onUndeployed(leftNodeId, ldr); + ctx.cache().onUndeployed(ldr); + ctx.stream().onUndeployed(ldr); // Clear optimized marshaller's cache. If another marshaller is used, this is no-op. IgniteOptimizedMarshaller.onUndeploy(ldr); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7cccef78/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 112483a..2a60c13 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -948,11 +948,10 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, /** * Undeploys and removes all entries for class loader. * - * @param leftNodeId Left node ID. * @param ldr Class loader to undeploy. */ - public void onUndeploy(@Nullable UUID leftNodeId, ClassLoader ldr) { - ctx.deploy().onUndeploy(leftNodeId, ldr); + public void onUndeploy(ClassLoader ldr) { + ctx.deploy().onUndeploy(ldr); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7cccef78/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java index d5ffb0d..2f007f2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java @@ -53,7 +53,8 @@ public class GridCacheDeploymentManager<K, V> extends GridCacheSharedManagerAdap private volatile ClassLoader globalLdr; /** Undeploys. */ - private final ConcurrentLinkedQueue<CA> undeploys = new ConcurrentLinkedQueue<>(); + private final ConcurrentHashMap8<GridCacheContext, ConcurrentLinkedQueue<CA>> undeploys + = new ConcurrentHashMap8<>(); /** Per-thread deployment context. */ private ConcurrentMap<IgniteUuid, CachedDeploymentInfo<K, V>> deps = @@ -177,11 +178,16 @@ public class GridCacheDeploymentManager<K, V> extends GridCacheSharedManagerAdap /** * Undeploy all queued up closures. + * + * @param ctx Cache context. */ - public void unwind() { + public void unwind(GridCacheContext ctx) { int cnt = 0; - for (CA c = undeploys.poll(); c != null; c = undeploys.poll()) { + if (undeploys.get(ctx) == null) + return; + + for (CA c = undeploys.get(ctx).poll(); c != null; c = undeploys.get(ctx).poll()) { c.apply(); cnt++; @@ -194,110 +200,114 @@ public class GridCacheDeploymentManager<K, V> extends GridCacheSharedManagerAdap /** * Undeploys given class loader. * - * @param leftNodeId Left node ID. * @param ldr Class loader to undeploy. */ - public void onUndeploy(@Nullable final UUID leftNodeId, final ClassLoader ldr) { + public void onUndeploy(final ClassLoader ldr) { assert ldr != null; if (log.isDebugEnabled()) log.debug("Received onUndeploy() request [ldr=" + ldr + ", cctx=" + cctx + ']'); - undeploys.add(new CA() { - @Override public void apply() { - onUndeploy0(leftNodeId, ldr); - } - }); + for (final GridCacheContext<K, V> cacheCtx : cctx.cacheContexts()) { + undeploys.putIfAbsent(cacheCtx, new ConcurrentLinkedQueue<CA>()); + + undeploys.get(cacheCtx).add(new CA() { + @Override + public void apply() { + onUndeploy0(ldr, cacheCtx); + } + }); + } for (GridCacheContext<K, V> cacheCtx : cctx.cacheContexts()) { // Unwind immediately for local and replicate caches. // We go through preloader for proper synchronization. - if (cacheCtx.isLocal() || cacheCtx.isReplicated()) + if (cacheCtx.isLocal()) cacheCtx.preloader().unwindUndeploys(); } } /** - * @param leftNodeId Left node ID. * @param ldr Loader. + * @param cacheCtx Cache context. */ - private void onUndeploy0(@Nullable final UUID leftNodeId, final ClassLoader ldr) { - for (final GridCacheContext<K, V> cacheCtx : cctx.cacheContexts()) { - GridCacheAdapter<K, V> cache = cacheCtx.cache(); - - Set<K> keySet = cache.keySet(cacheCtx.vararg( - new P1<CacheEntry<K, V>>() { - @Override public boolean apply(CacheEntry<K, V> e) { - return cacheCtx.isNear() ? undeploy(e, cacheCtx.near()) || undeploy(e, cacheCtx.near().dht()) : - undeploy(e, cacheCtx.cache()); - } + private void onUndeploy0(final ClassLoader ldr, final GridCacheContext<K, V> cacheCtx) { + GridCacheAdapter<K, V> cache = cacheCtx.cache(); + + Set<K> keySet = cache.keySet(cacheCtx.vararg( + new P1<CacheEntry<K, V>>() { + @Override public boolean apply(CacheEntry<K, V> e) { + return cacheCtx.isNear() ? undeploy(e, cacheCtx.near()) || undeploy(e, cacheCtx.near().dht()) : + undeploy(e, cacheCtx.cache()); + } - /** - * @param e Entry. - * @param cache Cache. - * @return {@code True} if entry should be undeployed. - */ - private boolean undeploy(CacheEntry<K, V> e, GridCacheAdapter<K, V> cache) { - K k = e.getKey(); + /** + * @param e Entry. + * @param cache Cache. + * @return {@code True} if entry should be undeployed. + */ + private boolean undeploy(CacheEntry<K, V> e, GridCacheAdapter<K, V> cache) { + K k = e.getKey(); - GridCacheEntryEx<K, V> entry = cache.peekEx(e.getKey()); + GridCacheEntryEx<K, V> entry = cache.peekEx(e.getKey()); - if (entry == null) - return false; + if (entry == null) + return false; - V v; + V v; - try { - v = entry.peek(GridCachePeekMode.GLOBAL, CU.<K, V>empty()); - } - catch (GridCacheEntryRemovedException ignore) { - return false; - } - catch (IgniteException ignore) { - // Peek can throw runtime exception if unmarshalling failed. - return true; - } + try { + v = entry.peek(GridCachePeekMode.GLOBAL, CU.<K, V>empty()); + } + catch (GridCacheEntryRemovedException ignore) { + return false; + } + catch (IgniteException ignore) { + // Peek can throw runtime exception if unmarshalling failed. + return true; + } - assert k != null : "Key cannot be null for cache entry: " + e; + assert k != null : "Key cannot be null for cache entry: " + e; - ClassLoader keyLdr = U.detectObjectClassLoader(k); - ClassLoader valLdr = U.detectObjectClassLoader(v); + ClassLoader keyLdr = U.detectObjectClassLoader(k); + ClassLoader valLdr = U.detectObjectClassLoader(v); - boolean res = F.eq(ldr, keyLdr) || F.eq(ldr, valLdr); + boolean res = F.eq(ldr, keyLdr) || F.eq(ldr, valLdr); - if (log.isDebugEnabled()) - log.debug("Finished examining entry [entryCls=" + e.getClass() + - ", key=" + k + ", keyCls=" + k.getClass() + - ", valCls=" + (v != null ? v.getClass() : "null") + - ", keyLdr=" + keyLdr + ", valLdr=" + valLdr + ", res=" + res + ']'); + if (log.isDebugEnabled()) + log.debug("Finished examining entry [entryCls=" + e.getClass() + + ", key=" + k + ", keyCls=" + k.getClass() + + ", valCls=" + (v != null ? v.getClass() : "null") + + ", keyLdr=" + keyLdr + ", valLdr=" + valLdr + ", res=" + res + ']'); - return res; - } - })); + return res; + } + })); - Collection<K> keys = new LinkedList<>(); + Collection<K> keys = new ArrayList<>(); - for (K k : keySet) - keys.add(k); + for (K k : keySet) + keys.add(k); - if (log.isDebugEnabled()) - log.debug("Finished searching keys for undeploy [keysCnt=" + keys.size() + ']'); + if (log.isDebugEnabled()) + log.debug("Finished searching keys for undeploy [keysCnt=" + keys.size() + ']'); - cache.clearAll(keys, true); + cache.clearAll(keys, true); - if (cacheCtx.isNear()) - cacheCtx.near().dht().clearAll(keys, true); + if (cacheCtx.isNear()) + cacheCtx.near().dht().clearAll(keys, true); - GridCacheQueryManager<K, V> qryMgr = cacheCtx.queries(); + GridCacheQueryManager<K, V> qryMgr = cacheCtx.queries(); - if (qryMgr != null) - qryMgr.onUndeploy(ldr); + if (qryMgr != null) + qryMgr.onUndeploy(ldr); - // Examine swap for entries to undeploy. - int swapUndeployCnt = cacheCtx.isNear() ? - cacheCtx.near().dht().context().swap().onUndeploy(leftNodeId, ldr) : - cacheCtx.swap().onUndeploy(leftNodeId, ldr); + // Examine swap for entries to undeploy. + int swapUndeployCnt = cacheCtx.isNear() ? + cacheCtx.near().dht().context().swap().onUndeploy(ldr) : + cacheCtx.swap().onUndeploy(ldr); + if (!cacheCtx.system()) { U.quietAndWarn(log, ""); U.quietAndWarn( log, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7cccef78/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java index 8128b3b..29b4b4a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java @@ -106,7 +106,7 @@ public class GridCachePreloaderAdapter<K, V> implements GridCachePreloader<K, V> /** {@inheritDoc} */ @Override public void unwindUndeploys() { - cctx.deploy().unwind(); + cctx.deploy().unwind(cctx); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7cccef78/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index a893ef3..cd1abab 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -1764,13 +1764,12 @@ public class GridCacheProcessor extends GridProcessorAdapter { * Callback invoked by deployment manager for whenever a class loader * gets undeployed. * - * @param leftNodeId Left node ID. * @param ldr Class loader. */ - public void onUndeployed(@Nullable UUID leftNodeId, ClassLoader ldr) { + public void onUndeployed(ClassLoader ldr) { if (!ctx.isStopping()) for (GridCacheAdapter<?, ?> cache : caches.values()) - cache.onUndeploy(leftNodeId, ldr); + cache.onUndeploy(ldr); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7cccef78/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java index d1d55c7..135c305 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java @@ -1434,11 +1434,10 @@ public class GridCacheSwapManager<K, V> extends GridCacheManagerAdapter<K, V> { } /** - * @param leftNodeId Left Node ID. * @param ldr Undeployed class loader. * @return Undeploy count. */ - public int onUndeploy(UUID leftNodeId, ClassLoader ldr) { + public int onUndeploy(ClassLoader ldr) { if (cctx.portableEnabled()) return 0; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7cccef78/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java index 30b536c..7839bd8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java @@ -389,7 +389,7 @@ public class GridDhtPartitionDemandPool<K, V> { demandLock.writeLock().lock(); try { - cctx.deploy().unwind(); + cctx.deploy().unwind(cctx); } finally { demandLock.writeLock().unlock(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7cccef78/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamProcessor.java index 5af5d5b..84c9e57 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamProcessor.java @@ -350,11 +350,10 @@ public class GridStreamProcessor extends GridProcessorAdapter { /** * Callback for undeployed class loaders. * - * @param leftNodeId Left node ID. * @param ldr Class loader. */ - public void onUndeployed(UUID leftNodeId, ClassLoader ldr) { + public void onUndeployed(ClassLoader ldr) { for (IgniteStreamerEx streamer : map.values()) - streamer.onUndeploy(leftNodeId, ldr); + streamer.onUndeploy(ldr); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7cccef78/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/IgniteStreamerEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/IgniteStreamerEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/IgniteStreamerEx.java index 4fee074..0158fab 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/IgniteStreamerEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/IgniteStreamerEx.java @@ -80,10 +80,9 @@ public interface IgniteStreamerEx extends IgniteStreamer { /** * Callback for undeployed class loaders. All deployed events will be removed from window and local storage. * - * @param leftNodeId Left node ID which caused undeployment. * @param undeployedLdr Undeployed class loader. */ - public void onUndeploy(UUID leftNodeId, ClassLoader undeployedLdr); + public void onUndeploy(ClassLoader undeployedLdr); /** * Callback executed when streamer query completes. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7cccef78/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/IgniteStreamerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/IgniteStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/IgniteStreamerImpl.java index 3542615..58fb036 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/IgniteStreamerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/IgniteStreamerImpl.java @@ -688,10 +688,9 @@ public class IgniteStreamerImpl implements IgniteStreamerEx, Externalizable { } /** {@inheritDoc} */ - @Override public void onUndeploy(UUID leftNodeId, ClassLoader undeployedLdr) { + @Override public void onUndeploy(ClassLoader undeployedLdr) { if (log.isDebugEnabled()) - log.debug("Processing undeployment event [leftNodeId=" + leftNodeId + - ", undeployedLdr=" + undeployedLdr + ']'); + log.debug("Processing undeployment event undeployedLdr=" + undeployedLdr + ']'); unwindUndeploys(undeployedLdr, true); }