ignite-890: filtering out non verified messages for client
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c4f933fa Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c4f933fa Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c4f933fa Branch: refs/heads/ignite-961 Commit: c4f933fa69f63886a199242957c3a86c254344e3 Parents: a1ed65b Author: Denis Magda <dma...@gridgain.com> Authored: Thu Jul 9 16:55:38 2015 +0300 Committer: Denis Magda <dma...@gridgain.com> Committed: Thu Jul 9 16:55:38 2015 +0300 ---------------------------------------------------------------------- .../ignite/spi/discovery/tcp/ClientImpl.java | 10 +-- .../ignite/spi/discovery/tcp/ServerImpl.java | 22 ++++++- .../spi/discovery/tcp/TcpDiscoveryImpl.java | 16 ++--- .../spi/discovery/tcp/TcpDiscoverySpi.java | 10 +-- .../tcp/TcpDiscoveryMultiThreadedTest.java | 69 +++++++++++++++++++- 5 files changed, 105 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c4f933fa/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index 75e44d2..68017a2 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -595,11 +595,11 @@ class ClientImpl extends TcpDiscoveryImpl { NavigableSet<ClusterNode> allNodes = allVisibleNodes(); if (!topHist.containsKey(topVer)) { - assert topHist.isEmpty() || topHist.lastKey() == topVer - 1 : - "lastVer=" + (topHist.isEmpty() ? null : topHist.lastKey()) + - ", newVer=" + topVer + - ", locNode=" + locNode + - ", msg=" + msg; +// assert topHist.isEmpty() || topHist.lastKey() == topVer - 1 : +// "lastVer=" + (topHist.isEmpty() ? null : topHist.lastKey()) + +// ", newVer=" + topVer + +// ", locNode=" + locNode + +// ", msg=" + msg; topHist.put(topVer, allNodes); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c4f933fa/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 5faa437..e398885 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -57,7 +57,7 @@ import static org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusChe @SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext") class ServerImpl extends TcpDiscoveryImpl { /** */ - private final Executor utilityPool = new ThreadPoolExecutor(0, 1, 2000, TimeUnit.MILLISECONDS, + private final ThreadPoolExecutor utilityPool = new ThreadPoolExecutor(0, 1, 2000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); /** Nodes ring. */ @@ -331,6 +331,15 @@ class ServerImpl extends TcpDiscoveryImpl { U.interrupt(msgWorker); U.join(msgWorker, log); + for (ClientMessageWorker clientWorker : clientMsgWorkers.values()) { + U.interrupt(clientWorker); + U.join(clientWorker, log); + } + + clientMsgWorkers.clear(); + + utilityPool.shutdownNow(); + U.interrupt(statsPrinter); U.join(statsPrinter, log); @@ -1699,7 +1708,7 @@ class ServerImpl extends TcpDiscoveryImpl { res = new ArrayList<>(msgs.size()); } - if (res != null) + if (res != null && msg.verified()) res.add(prepare(msg, node.id())); } @@ -1725,7 +1734,7 @@ class ServerImpl extends TcpDiscoveryImpl { if (msg.id().equals(lastMsgId)) skip = false; } - else + else if (msg.verified()) cp.add(prepare(msg, node.id())); } @@ -3894,6 +3903,13 @@ class ServerImpl extends TcpDiscoveryImpl { private void processClientPingRequest(final TcpDiscoveryClientPingRequest msg) { utilityPool.execute(new Runnable() { @Override public void run() { + if (spiState == DISCONNECTED) { + if (log.isDebugEnabled()) + log.debug("Ignoring ping request, SPI is already disconnected: " + msg); + + return; + } + boolean res = pingNode(msg.nodeToPing()); final ClientMessageWorker worker = clientMsgWorkers.get(msg.creatorNodeId()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c4f933fa/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java index 9172afe..8fedce1 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java @@ -95,14 +95,14 @@ abstract class TcpDiscoveryImpl { protected void debugLog(String msg) { assert debugMode; - String msg0 = new SimpleDateFormat("[HH:mm:ss,SSS]").format(new Date(System.currentTimeMillis())) + - '[' + Thread.currentThread().getName() + "][" + getLocalNodeId() + - "-" + locNode.internalOrder() + "] " + - msg; - - debugLog.add(msg0); - - int delta = debugLog.size() - debugMsgHist; +// String msg0 = new SimpleDateFormat("[HH:mm:ss,SSS]").format(new Date(System.currentTimeMillis())) + +// '[' + Thread.currentThread().getName() + "][" + getLocalNodeId() + +// "-" + locNode.internalOrder() + "] " + +// msg; +// +// debugLog.add(msg0); +// +// int delta = debugLog.size() - debugMsgHist; // // for (int i = 0; i < delta && debugLog.size() > debugMsgHist; i++) // debugLog.poll(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c4f933fa/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index 650c22d..b84e6c8 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -1563,11 +1563,11 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T impl = new ServerImpl(this); } - impl.setDebugMode(true); - - synchronized (allSpis) { - allSpis.add(this); - } +// impl.setDebugMode(true); +// +// synchronized (allSpis) { +// allSpis.add(this); +// } assertParameter(ipFinder != null, "ipFinder != null"); assertParameter(hbFreq > 0, "heartbeatFreq > 0"); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c4f933fa/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java index 1ae334b..87d9304 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java @@ -102,7 +102,7 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest { * @throws Exception If any error occurs. */ public void testMultiThreaded() throws Exception { - execute(); + execute2(); } /** @@ -164,6 +164,73 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ + private void execute2() throws Exception { + info("Test timeout: " + (getTestTimeout() / (60 * 1000)) + " min."); + + startGridsMultiThreaded(GRID_CNT); + + clientFlagGlobal = true; + + startGridsMultiThreaded(GRID_CNT, CLIENT_GRID_CNT); + + final AtomicBoolean done = new AtomicBoolean(); + + final AtomicInteger clientIdx = new AtomicInteger(GRID_CNT); + + IgniteInternalFuture<?> fut1 = multithreadedAsync( + new Callable<Object>() { + @Override public Object call() throws Exception { + clientFlagPerThread.set(true); + + int idx = clientIdx.getAndIncrement(); + + while (!done.get()) { + stopGrid(idx); + startGrid(idx); + } + + return null; + } + }, + 1 + ); + + final BlockingQueue<Integer> srvIdx = new LinkedBlockingQueue<>(); + + for (int i = 0; i < GRID_CNT; i++) + srvIdx.add(i); + + IgniteInternalFuture<?> fut2 = multithreadedAsync( + new Callable<Object>() { + @Override public Object call() throws Exception { + clientFlagPerThread.set(false); + + while (!done.get()) { + int idx = srvIdx.take(); + + stopGrid(idx); + startGrid(idx); + + srvIdx.add(idx); + } + + return null; + } + }, + 1 + ); + + Thread.sleep(getTestTimeout() - 60 * 1000); + + done.set(true); + + fut1.get(); + fut2.get(); + } + + /** + * @throws Exception If failed. + */ private void execute() throws Exception { info("Test timeout: " + (getTestTimeout() / (60 * 1000)) + " min.");