Repository: incubator-ignite Updated Branches: refs/heads/ignite-630 df0c9d567 -> df0c86afd
ignite-890: logs Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a1ed65b1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a1ed65b1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a1ed65b1 Branch: refs/heads/ignite-630 Commit: a1ed65b1f28e44e7633d1d33ba12a4d2242dbc80 Parents: 4031db7 Author: Denis Magda <dma...@gridgain.com> Authored: Thu Jul 9 11:27:35 2015 +0300 Committer: Denis Magda <dma...@gridgain.com> Committed: Thu Jul 9 11:27:35 2015 +0300 ---------------------------------------------------------------------- .../ignite/spi/discovery/tcp/ClientImpl.java | 7 +- .../ignite/spi/discovery/tcp/ServerImpl.java | 96 ++++++++++++++++---- .../spi/discovery/tcp/TcpDiscoveryImpl.java | 6 +- .../spi/discovery/tcp/TcpDiscoverySpi.java | 8 ++ .../tcp/TcpDiscoveryMultiThreadedTest.java | 4 +- 5 files changed, 95 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a1ed65b1/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index 3f05f59..75e44d2 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -129,7 +129,8 @@ class ClientImpl extends TcpDiscoveryImpl { b.append("Stats: ").append(spi.stats).append(U.nl()); - U.quietAndInfo(log, b.toString()); + System.out.println(b.toString()); +// U.quietAndInfo(log, b.toString()); } /** {@inheritDoc} */ @@ -781,8 +782,8 @@ class ClientImpl extends TcpDiscoveryImpl { msg.senderNodeId(rmtNodeId); - if (log.isDebugEnabled()) - log.debug("Message has been received: " + msg); +// if (log.isDebugEnabled()) + log.info("Message has been received: " + msg); spi.stats.onMessageReceived(msg); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a1ed65b1/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index d51293e..5faa437 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -1326,7 +1326,7 @@ class ServerImpl extends TcpDiscoveryImpl { return; } - assert log.isInfoEnabled(); + //assert log.isInfoEnabled(); synchronized (mux) { StringBuilder b = new StringBuilder(U.nl()); @@ -1379,7 +1379,8 @@ class ServerImpl extends TcpDiscoveryImpl { b.append("Stats: ").append(spi.stats).append(U.nl()); - U.quietAndInfo(log, b.toString()); + System.out.println(b.toString()); + //U.quietAndInfo(log, b.toString()); } } @@ -1820,6 +1821,8 @@ class ServerImpl extends TcpDiscoveryImpl { } } + private static volatile boolean dumping; + /** * Message worker thread for messages processing. */ @@ -1988,7 +1991,7 @@ class ServerImpl extends TcpDiscoveryImpl { if (debugMode) debugLog("New next node [newNext=" + newNext + ", formerNext=" + next + - ", ring=" + ring + ", failedNodes=" + failedNodes + ']'); + ", ring=" + ring + ", failedNodes=" + failedNodes + ", coord= " + resolveCoordinator() + ']'); U.closeQuiet(sock); @@ -2059,9 +2062,22 @@ class ServerImpl extends TcpDiscoveryImpl { if (!next.id().equals(nextId)) { // Node with different ID has bounded to the same port. - if (log.isDebugEnabled()) - log.debug("Failed to restore ring because next node ID received is not as " + - "expected [expectedId=" + next.id() + ", rcvdId=" + nextId + ']'); + //if (log.isDebugEnabled()) + log.info("Failed to restore ring because next node ID received is not as " + + "expected [expectedId=" + next.id() + ", rcvdId=" + nextId + ", " + + "locNode " + locNodeId + ", ring = " + ring + ']'); + +// if (!dumping) { +// synchronized (TcpDiscoverySpi.allSpis) { +// dumping = true; +// System.out.println("------------ Start dump ------ "); +// for (TcpDiscoverySpi spi : TcpDiscoverySpi.allSpis) +// spi.dumpDebugInfo(); +// } +// System.out.println("------------- End dump -----------"); +// +// System.exit(1); +// } if (debugMode) debugLog("Failed to restore ring because next node ID received is not as " + @@ -2216,7 +2232,8 @@ class ServerImpl extends TcpDiscoveryImpl { ", next=" + next.id() + ", res=" + res + ']'); - if (debugMode) + if (debugMode && !(msg instanceof TcpDiscoveryDiscardMessage) && !(msg instanceof TcpDiscoveryHeartbeatMessage) + && !(msg instanceof TcpDiscoveryCustomEventMessage)) debugLog("Message has been sent to next node [msg=" + msg + ", next=" + next.id() + ", res=" + res + ']'); @@ -2804,28 +2821,48 @@ class ServerImpl extends TcpDiscoveryImpl { if (log.isDebugEnabled()) log.debug("Accept client reconnect, restored pending messages " + "[locNodeId=" + locNodeId + ", clientNodeId=" + nodeId + ']'); + + if (debugMode) + debugLog("Accept client reconnect, restored pending messages " + + "[locNodeId=" + locNodeId + ", clientNodeId=" + nodeId + ']'); + } else { if (log.isDebugEnabled()) log.debug("Failing reconnecting client node because failed to restore pending " + "messages [locNodeId=" + locNodeId + ", clientNodeId=" + nodeId + ']'); + if (debugMode) + debugLog("Failing reconnecting client node because failed to restore pending " + + "messages [locNodeId=" + locNodeId + ", clientNodeId=" + nodeId + ']'); + processNodeFailedMessage(new TcpDiscoveryNodeFailedMessage(locNodeId, node.id(), node.internalOrder())); } } } - else if (log.isDebugEnabled()) - log.debug("Reconnecting client node is already failed [nodeId=" + nodeId + ']'); + else { + if (log.isDebugEnabled()) + log.debug("Reconnecting client node is already failed [nodeId=" + nodeId + ']'); + + if (debugMode) + debugLog("Reconnecting client node is already failed [nodeId=" + nodeId + ']'); + } if (isLocNodeRouter) { ClientMessageWorker wrk = clientMsgWorkers.get(nodeId); if (wrk != null) wrk.addMessage(msg); - else if (log.isDebugEnabled()) - log.debug("Failed to reconnect client node (disconnected during the process) [locNodeId=" + - locNodeId + ", clientNodeId=" + nodeId + ']'); + else { + if (log.isDebugEnabled()) + log.debug("Failed to reconnect client node (disconnected during the process) [locNodeId=" + + locNodeId + ", clientNodeId=" + nodeId + ']'); + + if (debugMode) + debugLog("Failed to reconnect client node (disconnected during the process) [locNodeId=" + + locNodeId + ", clientNodeId=" + nodeId + ']'); + } } else { if (ring.hasRemoteNodes()) @@ -3219,6 +3256,9 @@ class ServerImpl extends TcpDiscoveryImpl { if (log.isDebugEnabled()) log.debug("Starting local node stop procedure."); + if (debugMode) + debugLog("Starting local node stop procedure."); + spiState = STOPPING; mux.notifyAll(); @@ -3226,6 +3266,9 @@ class ServerImpl extends TcpDiscoveryImpl { } if (msg.verified() || !ring.hasRemoteNodes() || msg.senderNodeId() != null) { + if (debugMode) + debugLog("Do stop local node: [msg=" + msg + ", hasRemote=" + ring.hasRemoteNodes() + ']'); + if (spi.ipFinder.isShared() && !ring.hasRemoteNodes()) { try { spi.ipFinder.unregisterAddresses(locNode.socketAddresses()); @@ -3252,8 +3295,11 @@ class ServerImpl extends TcpDiscoveryImpl { } if (ring.node(msg.senderNodeId()) == null) { - if (log.isDebugEnabled()) - log.debug("Discarding node left message since sender node is not in topology: " + msg); +// if (log.isDebugEnabled()) + log.info("Discarding node left message since sender node is not in topology: " + msg); + + if (debugMode) + debugLog("Discarding node left message since sender node is not in topology: " + msg); return; } @@ -3266,8 +3312,11 @@ class ServerImpl extends TcpDiscoveryImpl { } } else { - if (log.isDebugEnabled()) - log.debug("Discarding node left message since node was not found: " + msg); +// if (log.isDebugEnabled()) + log.info("Discarding node left message since node was not found: " + msg); + + if (debugMode) + debugLog("Discarding node left message since node was not found: " + msg); return; } @@ -3276,6 +3325,9 @@ class ServerImpl extends TcpDiscoveryImpl { if (locNodeCoord) { if (msg.verified()) { + if (!locNode.id().equals(msg.verifierNodeId())) + System.out.println("Fuck!!: [loc=" + locNode + ", verifier=" + msg.verifierNodeId()); + spi.stats.onRingMessageReceived(msg); addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id())); @@ -3294,6 +3346,9 @@ class ServerImpl extends TcpDiscoveryImpl { if (log.isDebugEnabled()) log.debug("Removed node from topology: " + leftNode); + if (debugMode) + debugLog("Removed node from topology: " + leftNode); + long topVer; if (locNodeCoord) { @@ -3329,6 +3384,9 @@ class ServerImpl extends TcpDiscoveryImpl { if (log.isDebugEnabled()) log.debug("Sent verified node left message to leaving node: " + msg); + + if (debugMode) + debugLog("Sent verified node left message to leaving node: " + msg); } catch (IgniteCheckedException | IOException e) { if (log.isDebugEnabled()) @@ -3372,6 +3430,9 @@ class ServerImpl extends TcpDiscoveryImpl { if (log.isDebugEnabled()) log.debug("Unable to send message across the ring (topology has no remote nodes): " + msg); + if (debugMode) + debugLog("Unable to send message across the ring (topology has no remote nodes): " + msg); + U.closeQuiet(sock); } } @@ -4290,7 +4351,8 @@ class ServerImpl extends TcpDiscoveryImpl { spi.stats.onMessageReceived(msg); - if (debugMode && recordable(msg)) + if (debugMode && !(msg instanceof TcpDiscoveryDiscardMessage) && !(msg instanceof TcpDiscoveryHeartbeatMessage) + && !(msg instanceof TcpDiscoveryCustomEventMessage) && recordable(msg)) debugLog("Message has been received: " + msg); if (msg instanceof TcpDiscoveryJoinRequestMessage) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a1ed65b1/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java index ace917f..9172afe 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java @@ -103,9 +103,9 @@ abstract class TcpDiscoveryImpl { debugLog.add(msg0); int delta = debugLog.size() - debugMsgHist; - - for (int i = 0; i < delta && debugLog.size() > debugMsgHist; i++) - debugLog.poll(); +// +// for (int i = 0; i < delta && debugLog.size() > debugMsgHist; i++) +// debugLog.poll(); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a1ed65b1/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index 7663fe6..650c22d 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -327,6 +327,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T /** */ private boolean forceSrvMode; + public static volatile ArrayList<TcpDiscoverySpi> allSpis = new ArrayList<>(); + /** {@inheritDoc} */ @Override public String getSpiState() { return impl.getSpiState(); @@ -1561,6 +1563,12 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T impl = new ServerImpl(this); } + impl.setDebugMode(true); + + synchronized (allSpis) { + allSpis.add(this); + } + assertParameter(ipFinder != null, "ipFinder != null"); assertParameter(hbFreq > 0, "heartbeatFreq > 0"); assertParameter(netTimeout > 0, "networkTimeout > 0"); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a1ed65b1/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java index 0bf7cad..1ae334b 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java @@ -95,15 +95,13 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest { /** {@inheritDoc} */ @Override protected long getTestTimeout() { - return 5 * 60 * 1000; + return 2 * 60 * 1000; } /** * @throws Exception If any error occurs. */ public void testMultiThreaded() throws Exception { - fail("https://issues.apache.org/jira/browse/IGNITE-1100"); - execute(); }