Repository: incubator-ignite Updated Branches: refs/heads/ignite-45 94ea69250 -> 4b84c2fe0
IGNITE-45 - Fixed stopping of cache. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/4971a58b Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/4971a58b Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/4971a58b Branch: refs/heads/ignite-45 Commit: 4971a58b44610ad3e9077a96e302e53860356e80 Parents: 963e9c2 Author: Alexey Goncharuk <agoncha...@gridgain.com> Authored: Sat Mar 21 21:42:04 2015 -0700 Committer: Alexey Goncharuk <agoncha...@gridgain.com> Committed: Sat Mar 21 21:42:04 2015 -0700 ---------------------------------------------------------------------- .../apache/ignite/internal/IgniteKernal.java | 2 + .../processors/cache/GridCacheGateway.java | 103 ++++++++----------- .../processors/cache/GridCacheProcessor.java | 8 ++ 3 files changed, 53 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4971a58b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index 16eec48..48c3001 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -1752,6 +1752,8 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { Thread.currentThread().interrupt(); try { + ctx.cache().blockGateways(); + assert gw.getState() == STARTED || gw.getState() == STARTING; // No more kernal calls from this point on. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4971a58b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java index 9059478..86a99ed 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java @@ -18,7 +18,8 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.*; -import org.apache.ignite.internal.*; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.util.GridSpinReadWriteLock; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.jetbrains.annotations.*; @@ -34,6 +35,9 @@ public class GridCacheGateway<K, V> { /** Stopped flag for dynamic caches. */ private volatile boolean stopped; + /** */ + private GridSpinReadWriteLock rwLock = new GridSpinReadWriteLock(); + /** * @param ctx Cache context. */ @@ -50,28 +54,12 @@ public class GridCacheGateway<K, V> { if (ctx.deploymentEnabled()) ctx.deploy().onEnter(); - // Must unlock in case of unexpected errors to avoid - // deadlocks during kernal stop. - try { - if (stopped) - throw new IllegalStateException("Dynamic cache has been concurrently stopped: " + ctx.name()); + rwLock.readLock(); - ctx.kernalContext().gateway().readLock(); - } - catch (IllegalStateException e) { - // This exception is thrown only in case if grid has already been stopped - // and we must not call readUnlock. - throw e; - } - catch (RuntimeException | Error e) { - try { - ctx.kernalContext().gateway().readUnlock(); - } - catch (IllegalMonitorStateException ignore) { - // No-op. - } + if (stopped) { + rwLock.readUnlock(); - throw e; + throw new IllegalStateException("Dynamic cache has been stopped: " + ctx.name()); } } @@ -84,28 +72,14 @@ public class GridCacheGateway<K, V> { if (ctx.deploymentEnabled()) ctx.deploy().onEnter(); - GridKernalGateway kernalGateway = ctx.kernalContext().gateway(); - // Must unlock in case of unexpected errors to avoid // deadlocks during kernal stop. - try { - kernalGateway.readLockAnyway(); + rwLock.readLock(); - if (kernalGateway.getState() != GridKernalState.STARTED || stopped) { - kernalGateway.readUnlock(); + if (stopped) { + rwLock.readUnlock(); - return false; - } - } - catch (RuntimeException | Error e) { - try { - kernalGateway.readUnlock(); - } - catch (IllegalMonitorStateException ignore) { - // No-op. - } - - throw e; + return false; } return true; @@ -124,7 +98,7 @@ public class GridCacheGateway<K, V> { CU.unwindEvicts(ctx); } finally { - ctx.kernalContext().gateway().readUnlock(); + rwLock.readUnlock(); } } @@ -154,14 +128,17 @@ public class GridCacheGateway<K, V> { if (ctx.deploymentEnabled()) ctx.deploy().onEnter(); + rwLock.readLock(); + + if (stopped) { + rwLock.readUnlock(); + + throw new IllegalStateException("Dynamic cache has been stopped: " + ctx.name()); + } + // Must unlock in case of unexpected errors to avoid // deadlocks during kernal stop. try { - if (stopped) - throw new IllegalStateException("Dynamic cache has been concurrently stopped: " + ctx.name()); - - ctx.kernalContext().gateway().readLock(); - // Set thread local projection per call. GridCacheProjectionImpl<K, V> prev = ctx.projectionPerCall(); @@ -170,18 +147,8 @@ public class GridCacheGateway<K, V> { return prev; } - catch (IllegalStateException e) { - // This exception is thrown only in case if grid has already been stopped - // and we must not call readUnlock. - throw e; - } - catch (RuntimeException | Error e) { - try { - ctx.kernalContext().gateway().readUnlock(); - } - catch (IllegalMonitorStateException ignore) { - // No-op. - } + catch (RuntimeException e) { + rwLock.readUnlock(); throw e; } @@ -202,7 +169,7 @@ public class GridCacheGateway<K, V> { ctx.projectionPerCall(prev); } finally { - ctx.kernalContext().gateway().readUnlock(); + rwLock.readUnlock(); } } @@ -213,14 +180,30 @@ public class GridCacheGateway<K, V> { // Must prevent re-entries to the read lock. stopped = true; - ctx.kernalContext().gateway().writeLock(); + boolean interrupted = false; + + while (true) { + if (rwLock.tryWriteLock()) + break; + else { + try { + U.sleep(200); + } + catch (IgniteInterruptedCheckedException ignore) { + interrupted = true; + } + } + } + + if (interrupted) + Thread.currentThread().interrupt(); try { // No-op. stopped = true; } finally { - ctx.kernalContext().gateway().writeUnlock(); + rwLock.writeUnlock(); } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4971a58b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 469228f..543e8c7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -760,6 +760,14 @@ public class GridCacheProcessor extends GridProcessorAdapter { log.debug("Stopped cache processor."); } + /** + * Blocks all available gateways + */ + public void blockGateways() { + for (IgniteCacheProxy<?, ?> proxy : jCacheProxies.values()) + proxy.gate().onStopped(); + } + /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public void onKernalStop(boolean cancel) {