Repository: incubator-ignite Updated Branches: refs/heads/ignite-709_2 d59403b48 -> f9f766263
# IGNITE-709 Add test to check pending messages on client reconnect. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/f9f76626 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/f9f76626 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/f9f76626 Branch: refs/heads/ignite-709_2 Commit: f9f766263dfa9ea170cae4816e7667e0e3adb310 Parents: d59403b Author: sevdokimov <sergey.evdoki...@jetbrains.com> Authored: Sat May 2 20:56:41 2015 +0300 Committer: sevdokimov <sergey.evdoki...@jetbrains.com> Committed: Sat May 2 20:56:41 2015 +0300 ---------------------------------------------------------------------- .../tcp/TcpClientDiscoverySelfTest.java | 53 ++++++++++++++++---- 1 file changed, 43 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f9f76626/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySelfTest.java index fd9b0f7..05fb52b 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySelfTest.java @@ -39,7 +39,6 @@ import java.net.*; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; -import java.util.concurrent.locks.*; import static java.util.concurrent.TimeUnit.*; import static org.apache.ignite.events.EventType.*; @@ -327,7 +326,7 @@ public class TcpClientDiscoverySelfTest extends GridCommonAbstractTest { attachListeners(2, 2); - ((TestTcpClientDiscovery)G.ignite("client-1").configuration().getDiscoverySpi()).suspend(); + ((TestTcpClientDiscovery)G.ignite("client-1").configuration().getDiscoverySpi()).pauseAll(); stopGrid("server-2"); @@ -786,21 +785,55 @@ public class TcpClientDiscoverySelfTest extends GridCommonAbstractTest { */ private static class TestTcpClientDiscovery extends TcpClientDiscoverySpi { /** */ - private final Lock ioOperationsLock = new ReentrantLock(); + private final Object mux = new Object(); + + /** */ + private final AtomicBoolean writeLock = new AtomicBoolean(); + + /** */ + private final AtomicBoolean openSockLock = new AtomicBoolean(); + + /** + * @param lock Lock. + */ + private void waitFor(AtomicBoolean lock) { + try { + synchronized (mux) { + while (lock.get()) + mux.wait(); + } + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + + throw new RuntimeException(e); + } + } + + /** + * @param isPause Is lock. + * @param locks Locks. + */ + private void pauseResumeOperation(boolean isPause, AtomicBoolean... locks) { + synchronized (mux) { + for (AtomicBoolean lock : locks) + lock.set(isPause); + + mux.notifyAll(); + } + } /** {@inheritDoc} */ @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, GridByteArrayOutputStream bout) throws IOException, IgniteCheckedException { - ioOperationsLock.lock(); - ioOperationsLock.unlock(); + waitFor(writeLock); super.writeToSocket(sock, msg, bout); } /** {@inheritDoc} */ @Override protected Socket openSocket(InetSocketAddress sockAddr) throws IOException { - ioOperationsLock.lock(); - ioOperationsLock.unlock(); + waitFor(openSockLock); return super.openSocket(sockAddr); } @@ -808,8 +841,8 @@ public class TcpClientDiscoverySelfTest extends GridCommonAbstractTest { /** * */ - private void suspend() { - ioOperationsLock.lock(); + private void pauseAll() { + pauseResumeOperation(true, openSockLock, writeLock); brokeConnection(); } @@ -818,7 +851,7 @@ public class TcpClientDiscoverySelfTest extends GridCommonAbstractTest { * */ private void resume() { - ioOperationsLock.unlock(); + pauseResumeOperation(false, openSockLock, writeLock); } } }