Repository: incubator-ignite Updated Branches: refs/heads/ignite-901 7af514904 -> 7825bb21f
# ignite-901 Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/7825bb21 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/7825bb21 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/7825bb21 Branch: refs/heads/ignite-901 Commit: 7825bb21ff34e88d66a33ed7a20378271021f5e1 Parents: 7af5149 Author: sboikov <sboi...@gridgain.com> Authored: Mon Jul 13 10:04:38 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Mon Jul 13 10:04:38 2015 +0300 ---------------------------------------------------------------------- .../ignite/internal/GridKernalGatewayImpl.java | 55 +++++++------------- .../processors/cache/GridCacheGateway.java | 26 ++++----- 2 files changed, 29 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7825bb21/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java index fa395e8..f6a9e51 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java @@ -25,6 +25,7 @@ import org.apache.ignite.internal.util.typedef.internal.*; import java.io.*; import java.util.concurrent.*; +import java.util.concurrent.atomic.*; /** * @@ -43,7 +44,7 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable { private IgniteFutureImpl<?> reconnectFut; /** */ - private volatile GridKernalState state = GridKernalState.STOPPED; + private final AtomicReference<GridKernalState> state = new AtomicReference<>(GridKernalState.STOPPED); /** */ @GridToStringExclude @@ -71,14 +72,17 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable { rwLock.readLock(); - GridKernalState state = this.state; + GridKernalState state = this.state.get(); if (state != GridKernalState.STARTED) { // Unlock just acquired lock. rwLock.readUnlock(); - if (state == GridKernalState.DISCONNECTED) + if (state == GridKernalState.DISCONNECTED) { + assert reconnectFut != null; + throw new IgniteClientDisconnectedException(reconnectFut, "Client node disconnected: " + gridName); + } throw illegalState(); } @@ -91,7 +95,7 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable { rwLock.readLock(); - if (state == GridKernalState.DISCONNECTED) + if (state.get() == GridKernalState.DISCONNECTED) throw new IgniteClientDisconnectedException(reconnectFut, "Client node disconnected: " + gridName); } @@ -142,40 +146,23 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable { /** {@inheritDoc} */ @Override public GridFutureAdapter<?> onDisconnected() { - rwLock.readLock(); - - try { - if (state == GridKernalState.STARTED) { - GridFutureAdapter<?> fut = new GridFutureAdapter<>(); + GridFutureAdapter<?> fut = new GridFutureAdapter<>(); - reconnectFut = new IgniteFutureImpl<>(fut); + reconnectFut = new IgniteFutureImpl<>(fut); - state = GridKernalState.DISCONNECTED; + if (!state.compareAndSet(GridKernalState.STARTED, GridKernalState.DISCONNECTED)) { + ((GridFutureAdapter<?>)reconnectFut.internalFuture()).onDone(new IgniteCheckedException("Node stopped.")); - return fut; - } - else - return null; - } - finally { - rwLock.readUnlock(); + return null; } + + return fut; } /** {@inheritDoc} */ @Override public void onReconnected() { - rwLock.writeLock(); - - try { - if (state == GridKernalState.DISCONNECTED) { - state = GridKernalState.STARTED; - - ((GridFutureAdapter<?>)reconnectFut.internalFuture()).onDone(); - } - } - finally { - rwLock.writeUnlock(); - } + if (state.compareAndSet(GridKernalState.DISCONNECTED, GridKernalState.STARTED)) + ((GridFutureAdapter<?>)reconnectFut.internalFuture()).onDone(); } /** @@ -211,18 +198,16 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable { @Override public void setState(GridKernalState state) { assert state != null; - GridKernalState state0 = this.state; - // NOTE: this method should always be called within write lock. - this.state = state; + this.state.set(state); - if (state0 == GridKernalState.DISCONNECTED) + if (reconnectFut != null) ((GridFutureAdapter<?>)reconnectFut.internalFuture()).onDone(new IgniteCheckedException("Node stopped.")); } /** {@inheritDoc} */ @Override public GridKernalState getState() { - return state; + return state.get(); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7825bb21/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java index eeb9b7c..263a697 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java @@ -26,6 +26,7 @@ import org.apache.ignite.lang.*; import org.jetbrains.annotations.*; import javax.cache.*; +import java.util.concurrent.atomic.*; /** * Cache gateway. @@ -36,7 +37,7 @@ public class GridCacheGateway<K, V> { private final GridCacheContext<K, V> ctx; /** Stopped flag for dynamic caches. */ - private volatile State state = State.STARTED; + private final AtomicReference<State> state = new AtomicReference<>(State.STARTED); /** */ private IgniteFuture<?> reconnectFut; @@ -71,7 +72,7 @@ public class GridCacheGateway<K, V> { * @return {@code True} if cache is in started state. */ private boolean checkState(boolean lock, boolean stopErr) { - State state = this.state; + State state = this.state.get(); if (state != State.STARTED) { if (lock) @@ -250,7 +251,7 @@ public class GridCacheGateway<K, V> { * */ public void stopped() { - state = State.STOPPED; + state.set(State.STOPPED); } /** @@ -259,26 +260,18 @@ public class GridCacheGateway<K, V> { public void onDisconnected(IgniteFuture<?> reconnectFut) { assert reconnectFut != null; - if (state == State.STARTED) { - this.reconnectFut = reconnectFut; + this.reconnectFut = reconnectFut; - state = State.DISCONNECTED; - } + state.compareAndSet(State.STARTED, State.DISCONNECTED); } /** * @param stopped Cache stopped flag. */ public void reconnected(boolean stopped) { - rwLock.writeLock(); + State newState = stopped ? State.STOPPED : State.STARTED; - try { - if (state == State.DISCONNECTED) - state = stopped ? State.STOPPED : State.STARTED; - } - finally { - rwLock.writeUnlock(); - } + state.compareAndSet(State.DISCONNECTED, newState); } /** @@ -304,8 +297,7 @@ public class GridCacheGateway<K, V> { Thread.currentThread().interrupt(); try { - // No-op. - state = State.STOPPED; + state.set(State.STOPPED); } finally { rwLock.writeUnlock();