IGNITE-709 Fix tests.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/e575164b Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/e575164b Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/e575164b Branch: refs/heads/ignite-23 Commit: e575164bbfb8f4f5b7c56c3470b234fff52cf8a3 Parents: d59a712 Author: sevdokimov <sevdoki...@gridgain.com> Authored: Tue May 26 14:41:18 2015 +0300 Committer: sevdokimov <sevdoki...@gridgain.com> Committed: Tue May 26 14:41:18 2015 +0300 ---------------------------------------------------------------------- .../internal/GridEventConsumeHandler.java | 106 ++++++++++--------- .../continuous/GridContinuousProcessor.java | 37 ++++++- .../apache/ignite/internal/GridSelfTest.java | 4 +- 3 files changed, 93 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e575164b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java index bb8366a..f33fa39 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java @@ -137,80 +137,86 @@ class GridEventConsumeHandler implements GridContinuousHandler { private boolean notificationInProgress; @Override public void onEvent(Event evt) { - synchronized (notificationQueue) { - notificationQueue.add(new T3<>(nodeId, routineId, evt)); + if (filter != null && !filter.apply(evt)) + return; - if (!notificationInProgress) { - ctx.getSystemExecutorService().submit(new Runnable() { - @Override public void run() { - while (true) { - T3<UUID, UUID, Event> t3; - - synchronized (notificationQueue) { - t3 = notificationQueue.poll(); + if (loc) { + if (!cb.apply(nodeId, evt)) + ctx.continuous().stopRoutine(routineId); + } + else { + if (ctx.discovery().node(nodeId) == null) + return; - if (t3 == null) { - notificationInProgress = false; + synchronized (notificationQueue) { + notificationQueue.add(new T3<>(nodeId, routineId, evt)); - return; - } - } + if (!notificationInProgress) { + ctx.getSystemExecutorService().submit(new Runnable() { + @Override public void run() { + if (!ctx.continuous().lockStopping()) + return; try { - Event evt = t3.get3(); + while (true) { + T3<UUID, UUID, Event> t3; - if (filter != null && !filter.apply(evt)) - continue; + synchronized (notificationQueue) { + t3 = notificationQueue.poll(); - if (loc) { - if (!cb.apply(nodeId, evt)) { - ctx.continuous().stopRoutine(routineId); + if (t3 == null) { + notificationInProgress = false; - return; + return; + } } - continue; - } + try { + Event evt = t3.get3(); - ClusterNode node = ctx.discovery().node(t3.get1()); + EventWrapper wrapper = new EventWrapper(evt); - if (node == null) - continue; + if (evt instanceof CacheEvent) { + String cacheName = ((CacheEvent)evt).cacheName(); - EventWrapper wrapper = new EventWrapper(evt); + ClusterNode node = ctx.discovery().node(t3.get1()); - if (evt instanceof CacheEvent) { - String cacheName = ((CacheEvent)evt).cacheName(); + if (node == null) + continue; - if (ctx.config().isPeerClassLoadingEnabled() - && ctx.discovery().cacheNode(node, cacheName)) { - wrapper.p2pMarshal(ctx.config().getMarshaller()); + if (ctx.config().isPeerClassLoadingEnabled() + && ctx.discovery().cacheNode(node, cacheName)) { + wrapper.p2pMarshal(ctx.config().getMarshaller()); - wrapper.cacheName = cacheName; + wrapper.cacheName = cacheName; - GridCacheDeploymentManager depMgr = - ctx.cache().internalCache(cacheName).context().deploy(); + GridCacheDeploymentManager depMgr = ctx.cache() + .internalCache(cacheName).context().deploy(); - depMgr.prepare(wrapper); + depMgr.prepare(wrapper); + } + } + + ctx.continuous().addNotification(t3.get1(), t3.get2(), wrapper, null, false, + false); + } + catch (ClusterTopologyCheckedException ignored) { + // No-op. + } + catch (Throwable e) { + U.error(ctx.log(GridEventConsumeHandler.class), + "Failed to send event notification to node: " + nodeId, e); } } - - ctx.continuous().addNotification(t3.get1(), t3.get2(), wrapper, null, - false, false); - } - catch (ClusterTopologyCheckedException - | IgniteInterruptedCheckedException ignored) { - // No-op. } - catch (Throwable e) { - U.error(ctx.log(GridEventConsumeHandler.class), - "Failed to send event notification to node: " + nodeId, e); + finally { + ctx.continuous().unlockStopping(); } } - } - }); + }); - notificationInProgress = true; + notificationInProgress = true; + } } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e575164b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index 71a2a66..67b32a6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -93,7 +93,10 @@ public class GridContinuousProcessor extends GridProcessorAdapter { private int retryCnt = 3; /** */ - private volatile boolean processorStopped; + private final ReentrantReadWriteLock processorStopLock = new ReentrantReadWriteLock(); + + /** */ + private boolean processorStopped; /** * @param ctx Kernal context. @@ -259,9 +262,39 @@ public class GridContinuousProcessor extends GridProcessorAdapter { log.debug("Continuous processor started."); } + /** + * @return {@code true} if lock successful, {@code false} if processor already stopped. + */ + @SuppressWarnings("LockAcquiredButNotSafelyReleased") + public boolean lockStopping() { + processorStopLock.readLock().lock(); + + if (processorStopped) { + processorStopLock.readLock().unlock(); + + return false; + } + + return true; + } + + /** + * + */ + public void unlockStopping() { + processorStopLock.readLock().unlock(); + } + /** {@inheritDoc} */ @Override public void onKernalStop(boolean cancel) { - processorStopped = true; + processorStopLock.writeLock().lock(); + + try { + processorStopped = true; + } + finally { + processorStopLock.writeLock().unlock(); + } } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e575164b/modules/core/src/test/java/org/apache/ignite/internal/GridSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridSelfTest.java index b4dce6c..7f5ee54 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/GridSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridSelfTest.java @@ -118,8 +118,8 @@ public class GridSelfTest extends GridProjectionAbstractTest { g.message().remoteListen(null, new MessagingListenActor<String>() { @Override protected void receive(UUID nodeId, String rcvMsg) throws Throwable { - assert locNodeId.equals(nodeId); - assert msg.equals(rcvMsg); + assertEquals(locNodeId, nodeId); + assertEquals(msg, rcvMsg); stop(rcvMsg); }