http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index fd17791..359de1c 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -157,12 +157,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** Default idle connection timeout (value is <tt>30000</tt>ms). */ public static final long DFLT_IDLE_CONN_TIMEOUT = 30000; - /** Default value for connection buffer flush frequency (value is <tt>100</tt> ms). */ - public static final long DFLT_CONN_BUF_FLUSH_FREQ = 100; - - /** Default value for connection buffer size (value is <tt>0</tt>). */ - public static final int DFLT_CONN_BUF_SIZE = 0; - /** Default socket send and receive buffer size. */ public static final int DFLT_SOCK_BUF_SIZE = 32 * 1024; @@ -267,7 +261,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter log.debug("Session was closed but there are unacknowledged messages, " + "will try to reconnect [rmtNode=" + recoveryData.node().id() + ']'); - recoveryWorker.addReconnectRequest(recoveryData); + commWorker.addReconnectRequest(recoveryData); } } else @@ -603,13 +597,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** Idle connection timeout. */ private long idleConnTimeout = DFLT_IDLE_CONN_TIMEOUT; - /** Connection buffer flush frequency. */ - private volatile long connBufFlushFreq = DFLT_CONN_BUF_FLUSH_FREQ; - - /** Connection buffer size. */ - @SuppressWarnings("RedundantFieldInitialization") - private int connBufSize = DFLT_CONN_BUF_SIZE; - /** Connect timeout. */ private long connTimeout = DFLT_CONN_TIMEOUT; @@ -647,17 +634,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** Socket write timeout. */ private long sockWriteTimeout = DFLT_SOCK_WRITE_TIMEOUT; - /** Idle client worker. */ - private IdleClientWorker idleClientWorker; - - /** Flush client worker. */ - private ClientFlushWorker clientFlushWorker; - - /** Socket timeout worker. */ - private SocketTimeoutWorker sockTimeoutWorker; - - /** Recovery worker. */ - private RecoveryWorker recoveryWorker; + /** Recovery and idle clients handler. */ + private CommunicationWorker commWorker; /** Clients. */ private final ConcurrentMap<UUID, GridCommunicationClient> clients = GridConcurrentFactory.newMap(); @@ -882,31 +860,29 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** * Sets connection buffer size. If set to {@code 0} connection buffer is disabled. - * <p> - * If not provided, default value is {@link #DFLT_CONN_BUF_SIZE}. * * @param connBufSize Connection buffer size. * @see #setConnectionBufferFlushFrequency(long) */ @IgniteSpiConfiguration(optional = true) public void setConnectionBufferSize(int connBufSize) { - this.connBufSize = connBufSize; + // No-op. } /** {@inheritDoc} */ @Override public int getConnectionBufferSize() { - return connBufSize; + return 0; } /** {@inheritDoc} */ @IgniteSpiConfiguration(optional = true) @Override public void setConnectionBufferFlushFrequency(long connBufFlushFreq) { - this.connBufFlushFreq = connBufFlushFreq; + // No-op. } /** {@inheritDoc} */ @Override public long getConnectionBufferFlushFrequency() { - return connBufFlushFreq; + return 0; } /** @@ -1174,8 +1150,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter assertParameter(locPort <= 0xffff, "locPort < 0xffff"); assertParameter(locPortRange >= 0, "locPortRange >= 0"); assertParameter(idleConnTimeout > 0, "idleConnTimeout > 0"); - assertParameter(connBufFlushFreq > 0, "connBufFlushFreq > 0"); - assertParameter(connBufSize >= 0, "connBufSize >= 0"); assertParameter(sockRcvBuf >= 0, "sockRcvBuf >= 0"); assertParameter(sockSndBuf >= 0, "sockSndBuf >= 0"); assertParameter(msgQueueLimit >= 0, "msgQueueLimit >= 0"); @@ -1245,8 +1219,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter log.debug(configInfo("idleConnTimeout", idleConnTimeout)); log.debug(configInfo("directBuf", directBuf)); log.debug(configInfo("directSendBuf", directSndBuf)); - log.debug(configInfo("connBufSize", connBufSize)); - log.debug(configInfo("connBufFlushFreq", connBufFlushFreq)); log.debug(configInfo("selectorsCnt", selectorsCnt)); log.debug(configInfo("tcpNoDelay", tcpNoDelay)); log.debug(configInfo("sockSndBuf", sockSndBuf)); @@ -1261,11 +1233,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter log.debug(configInfo("unackedMsgsBufSize", unackedMsgsBufSize)); } - if (connBufSize > 8192) - U.warn(log, "Specified communication IO buffer size is larger than recommended (ignore if done " + - "intentionally) [specified=" + connBufSize + ", recommended=8192]", - "Specified communication IO buffer size is larger than recommended (ignore if done intentionally)."); - if (!tcpNoDelay) U.quietAndWarn(log, "'TCP_NO_DELAY' for communication is off, which should be used with caution " + "since may produce significant delays with some scenarios."); @@ -1274,23 +1241,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter nioSrvr.start(); - idleClientWorker = new IdleClientWorker(); + commWorker = new CommunicationWorker(); - idleClientWorker.start(); - - recoveryWorker = new RecoveryWorker(); - - recoveryWorker.start(); - - if (connBufSize > 0) { - clientFlushWorker = new ClientFlushWorker(); - - clientFlushWorker.start(); - } - - sockTimeoutWorker = new SocketTimeoutWorker(); - - sockTimeoutWorker.start(); + commWorker.start(); // Ack start. if (log.isDebugEnabled()) @@ -1445,15 +1398,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (nioSrvr != null) nioSrvr.stop(); - U.interrupt(idleClientWorker); - U.interrupt(clientFlushWorker); - U.interrupt(sockTimeoutWorker); - U.interrupt(recoveryWorker); + U.interrupt(commWorker); - U.join(idleClientWorker, log); - U.join(clientFlushWorker, log); - U.join(sockTimeoutWorker, log); - U.join(recoveryWorker, log); + U.join(commWorker, log); // Force closing on stop (safety). for (GridCommunicationClient client : clients.values()) @@ -1461,7 +1408,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter // Clear resources. nioSrvr = null; - idleClientWorker = null; + commWorker = null; boundTcpPort = -1; @@ -1899,7 +1846,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter ) throws IgniteCheckedException { HandshakeTimeoutObject<T> obj = new HandshakeTimeoutObject<>(client, U.currentTimeMillis() + timeout); - sockTimeoutWorker.addTimeoutObject(obj); + addTimeoutObject(obj); long rcvCnt = 0; @@ -2005,7 +1952,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter boolean cancelled = obj.cancel(); if (cancelled) - sockTimeoutWorker.removeTimeoutObject(obj); + removeTimeoutObject(obj); // Ignoring whatever happened after timeout - reporting only timeout event. if (!cancelled) @@ -2041,15 +1988,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (nioSrvr != null) nioSrvr.stop(); - U.interrupt(idleClientWorker); - U.interrupt(clientFlushWorker); - U.interrupt(sockTimeoutWorker); - U.interrupt(recoveryWorker); + U.interrupt(commWorker); - U.join(idleClientWorker, log); - U.join(clientFlushWorker, log); - U.join(sockTimeoutWorker, log); - U.join(recoveryWorker, log); + U.join(commWorker, log); for (GridCommunicationClient client : clients.values()) client.forceClose(); @@ -2156,80 +2097,95 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** * */ - private class IdleClientWorker extends IgniteSpiThread { + private class CommunicationWorker extends IgniteSpiThread { + /** */ + private final BlockingQueue<GridNioRecoveryDescriptor> q = new LinkedBlockingQueue<>(); + /** * */ - IdleClientWorker() { - super(gridName, "nio-idle-client-collector", log); + private CommunicationWorker() { + super(gridName, "tcp-comm-worker", log); } /** {@inheritDoc} */ - @SuppressWarnings({"BusyWait"}) @Override protected void body() throws InterruptedException { + if (log.isDebugEnabled()) + log.debug("Tcp communication worker has been started."); + while (!isInterrupted()) { - cleanupRecovery(); + GridNioRecoveryDescriptor recoveryDesc = q.poll(idleConnTimeout, TimeUnit.MILLISECONDS); - for (Map.Entry<UUID, GridCommunicationClient> e : clients.entrySet()) { - UUID nodeId = e.getKey(); + if (recoveryDesc != null) + processRecovery(recoveryDesc); + else + processIdle(); + } + } - GridCommunicationClient client = e.getValue(); + /** + * + */ + private void processIdle() { + cleanupRecovery(); - ClusterNode node = getSpiContext().node(nodeId); + for (Map.Entry<UUID, GridCommunicationClient> e : clients.entrySet()) { + UUID nodeId = e.getKey(); - if (node == null) { - if (log.isDebugEnabled()) - log.debug("Forcing close of non-existent node connection: " + nodeId); + GridCommunicationClient client = e.getValue(); - client.forceClose(); + ClusterNode node = getSpiContext().node(nodeId); - clients.remove(nodeId, client); + if (node == null) { + if (log.isDebugEnabled()) + log.debug("Forcing close of non-existent node connection: " + nodeId); - continue; - } + client.forceClose(); - GridNioRecoveryDescriptor recovery = null; + clients.remove(nodeId, client); - if (client instanceof GridTcpNioCommunicationClient) { - recovery = recoveryDescs.get(new ClientKey(node.id(), node.order())); + continue; + } - if (recovery != null && recovery.lastAcknowledged() != recovery.received()) { - RecoveryLastReceivedMessage msg = new RecoveryLastReceivedMessage(recovery.received()); + GridNioRecoveryDescriptor recovery = null; - if (log.isDebugEnabled()) - log.debug("Send recovery acknowledgement on timeout [rmtNode=" + nodeId + - ", rcvCnt=" + msg.received() + ']'); + if (client instanceof GridTcpNioCommunicationClient) { + recovery = recoveryDescs.get(new ClientKey(node.id(), node.order())); - nioSrvr.sendSystem(((GridTcpNioCommunicationClient)client).session(), msg); + if (recovery != null && recovery.lastAcknowledged() != recovery.received()) { + RecoveryLastReceivedMessage msg = new RecoveryLastReceivedMessage(recovery.received()); - recovery.lastAcknowledged(msg.received()); + if (log.isDebugEnabled()) + log.debug("Send recovery acknowledgement on timeout [rmtNode=" + nodeId + + ", rcvCnt=" + msg.received() + ']'); - continue; - } - } + nioSrvr.sendSystem(((GridTcpNioCommunicationClient)client).session(), msg); - long idleTime = client.getIdleTime(); + recovery.lastAcknowledged(msg.received()); - if (idleTime >= idleConnTimeout) { - if (recovery != null && - recovery.nodeAlive(getSpiContext().node(nodeId)) && - !recovery.messagesFutures().isEmpty()) { - if (log.isDebugEnabled()) - log.debug("Node connection is idle, but there are unacknowledged messages, " + - "will wait: " + nodeId); + continue; + } + } - continue; - } + long idleTime = client.getIdleTime(); + if (idleTime >= idleConnTimeout) { + if (recovery != null && + recovery.nodeAlive(getSpiContext().node(nodeId)) && + !recovery.messagesFutures().isEmpty()) { if (log.isDebugEnabled()) - log.debug("Closing idle node connection: " + nodeId); + log.debug("Node connection is idle, but there are unacknowledged messages, " + + "will wait: " + nodeId); - if (client.close() || client.closed()) - clients.remove(nodeId, client); + continue; } - } - Thread.sleep(idleConnTimeout); + if (log.isDebugEnabled()) + log.debug("Closing idle node connection: " + nodeId); + + if (client.close() || client.closed()) + clients.remove(nodeId, client); + } } } @@ -2264,212 +2220,39 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } } } - } - - /** - * - */ - private class ClientFlushWorker extends IgniteSpiThread { - /** - * - */ - ClientFlushWorker() { - super(gridName, "nio-client-flusher", log); - } - - /** {@inheritDoc} */ - @SuppressWarnings({"BusyWait"}) - @Override protected void body() throws InterruptedException { - while (!isInterrupted()) { - long connBufFlushFreq0 = connBufFlushFreq; - - for (Map.Entry<UUID, GridCommunicationClient> entry : clients.entrySet()) { - GridCommunicationClient client = entry.getValue(); - - if (client.reserve()) { - boolean err = true; - - try { - client.flushIfNeeded(connBufFlushFreq0); - - err = false; - } - catch (IOException e) { - if (getSpiContext().pingNode(entry.getKey())) - U.error(log, "Failed to flush client: " + client, e); - else { - if (log.isDebugEnabled()) - log.debug("Failed to flush client (node left): " + client); - - onException("Failed to flush client (node left): " + client, e); - } - } - finally { - if (err) - client.forceClose(); - else - client.release(); - } - } - } - - Thread.sleep(connBufFlushFreq0); - } - } - } - - /** - * Handles sockets timeouts. - */ - private class SocketTimeoutWorker extends IgniteSpiThread { - /** Time-based sorted set for timeout objects. */ - private final GridConcurrentSkipListSet<HandshakeTimeoutObject> timeoutObjs = - new GridConcurrentSkipListSet<>(new Comparator<HandshakeTimeoutObject>() { - @Override public int compare(HandshakeTimeoutObject o1, HandshakeTimeoutObject o2) { - long time1 = o1.endTime(); - long time2 = o2.endTime(); - - long id1 = o1.id(); - long id2 = o2.id(); - - return time1 < time2 ? -1 : time1 > time2 ? 1 : - id1 < id2 ? -1 : id1 > id2 ? 1 : 0; - } - }); - - /** Mutex. */ - private final Object mux0 = new Object(); - - /** - * - */ - SocketTimeoutWorker() { - super(gridName, "tcp-comm-sock-timeout-worker", log); - } - - /** - * @param timeoutObj Timeout object to add. - */ - @SuppressWarnings({"NakedNotify"}) - public void addTimeoutObject(HandshakeTimeoutObject timeoutObj) { - assert timeoutObj != null && timeoutObj.endTime() > 0 && timeoutObj.endTime() != Long.MAX_VALUE; - - timeoutObjs.add(timeoutObj); - - if (timeoutObjs.firstx() == timeoutObj) { - synchronized (mux0) { - mux0.notifyAll(); - } - } - } /** - * @param timeoutObj Timeout object to remove. + * @param recoveryDesc Recovery descriptor. */ - public void removeTimeoutObject(HandshakeTimeoutObject timeoutObj) { - assert timeoutObj != null; - - timeoutObjs.remove(timeoutObj); - } - - /** {@inheritDoc} */ - @Override protected void body() throws InterruptedException { - if (log.isDebugEnabled()) - log.debug("Socket timeout worker has been started."); + private void processRecovery(GridNioRecoveryDescriptor recoveryDesc) { + ClusterNode node = recoveryDesc.node(); - while (!isInterrupted()) { - long now = U.currentTimeMillis(); - - for (Iterator<HandshakeTimeoutObject> iter = timeoutObjs.iterator(); iter.hasNext(); ) { - HandshakeTimeoutObject timeoutObj = iter.next(); - - if (timeoutObj.endTime() <= now) { - iter.remove(); + if (clients.containsKey(node.id()) || !recoveryDesc.nodeAlive(getSpiContext().node(node.id()))) + return; - timeoutObj.onTimeout(); - } - else - break; - } - - synchronized (mux0) { - while (true) { - // Access of the first element must be inside of - // synchronization block, so we don't miss out - // on thread notification events sent from - // 'addTimeoutObject(..)' method. - HandshakeTimeoutObject first = timeoutObjs.firstx(); + try { + if (log.isDebugEnabled()) + log.debug("Recovery reconnect [rmtNode=" + recoveryDesc.node().id() + ']'); - if (first != null) { - long waitTime = first.endTime() - U.currentTimeMillis(); + GridCommunicationClient client = reserveClient(node); - if (waitTime > 0) - mux0.wait(waitTime); - else - break; - } - else - mux0.wait(5000); - } - } + client.release(); } - } - } - - /** - * - */ - private class RecoveryWorker extends IgniteSpiThread { - /** */ - private final BlockingQueue<GridNioRecoveryDescriptor> q = new LinkedBlockingQueue<>(); - - /** - * - */ - private RecoveryWorker() { - super(gridName, "tcp-comm-recovery-worker", log); - } - - /** {@inheritDoc} */ - @Override protected void body() throws InterruptedException { - if (log.isDebugEnabled()) - log.debug("Recovery worker has been started."); - - while (!isInterrupted()) { - GridNioRecoveryDescriptor recoveryDesc = q.take(); - - assert recoveryDesc != null; - - ClusterNode node = recoveryDesc.node(); - - if (clients.containsKey(node.id()) || !recoveryDesc.nodeAlive(getSpiContext().node(node.id()))) - continue; - - try { + catch (IgniteCheckedException | IgniteException e) { + if (recoveryDesc.nodeAlive(getSpiContext().node(node.id()))) { if (log.isDebugEnabled()) - log.debug("Recovery reconnect [rmtNode=" + recoveryDesc.node().id() + ']'); - - GridCommunicationClient client = reserveClient(node); + log.debug("Recovery reconnect failed, will retry " + + "[rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']'); - client.release(); + addReconnectRequest(recoveryDesc); } - catch (IgniteCheckedException e) { - if (recoveryDesc.nodeAlive(getSpiContext().node(node.id()))) { - if (log.isDebugEnabled()) - log.debug("Recovery reconnect failed, will retry " + - "[rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']'); - - addReconnectRequest(recoveryDesc); - } - else { - if (log.isDebugEnabled()) - log.debug("Recovery reconnect failed, " + - "node left [rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']'); - - onException("Recovery reconnect failed, node left [rmtNode=" + recoveryDesc.node().id() + "]", - e); - } + else { + if (log.isDebugEnabled()) + log.debug("Recovery reconnect failed, " + + "node left [rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']'); + onException("Recovery reconnect failed, node left [rmtNode=" + recoveryDesc.node().id() + "]", + e); } } } @@ -2497,12 +2280,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** * */ - private static class HandshakeTimeoutObject<T> { - /** */ - private static final AtomicLong idGen = new AtomicLong(); - + private static class HandshakeTimeoutObject<T> implements IgniteSpiTimeoutObject { /** */ - private final long id = idGen.incrementAndGet(); + private final IgniteUuid id = IgniteUuid.randomUuid(); /** */ private final T obj; @@ -2533,34 +2313,24 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter return done.compareAndSet(false, true); } - /** - * @return {@code True} if object has not yet been canceled. - */ - boolean onTimeout() { + /** {@inheritDoc} */ + @Override public void onTimeout() { if (done.compareAndSet(false, true)) { // Close socket - timeout occurred. if (obj instanceof GridCommunicationClient) ((GridCommunicationClient)obj).forceClose(); else U.closeQuiet((AbstractInterruptibleChannel)obj); - - return true; } - - return false; } - /** - * @return End time. - */ - long endTime() { + /** {@inheritDoc} */ + @Override public long endTime() { return endTime; } - /** - * @return ID. - */ - long id() { + /** {@inheritDoc} */ + @Override public IgniteUuid id() { return id; }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java index 5c80e6e..6f5a738 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java @@ -171,8 +171,6 @@ public interface TcpCommunicationSpiMBean extends IgniteSpiManagementMBean { * This frequency defines how often system will advice to flush * connection buffer. * <p> - * If not provided, default value is {@link TcpCommunicationSpi#DFLT_CONN_BUF_FLUSH_FREQ}. - * <p> * This property is used only if {@link #getConnectionBufferSize()} is greater than {@code 0}. * * @param connBufFlushFreq Flush frequency. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java index 7560999..b952087 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java @@ -17,12 +17,13 @@ package org.apache.ignite.spi.discovery; +import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.lang.*; import org.apache.ignite.spi.*; +import org.apache.ignite.spi.discovery.tcp.*; import org.jetbrains.annotations.*; -import java.io.*; import java.util.*; /** @@ -102,7 +103,7 @@ public interface DiscoverySpi extends IgniteSpi { * * @param exchange Discovery data exchange handler. */ - public void setDataExchange(DiscoverySpiDataExchange exchange); + public TcpDiscoverySpi setDataExchange(DiscoverySpiDataExchange exchange); /** * Sets discovery metrics provider. Use metrics provided by @@ -111,7 +112,7 @@ public interface DiscoverySpi extends IgniteSpi { * * @param metricsProvider Provider of metrics data. */ - public void setMetricsProvider(DiscoveryMetricsProvider metricsProvider); + public TcpDiscoverySpi setMetricsProvider(DiscoveryMetricsProvider metricsProvider); /** * Tells discovery SPI to disconnect from topology. This is very close to calling @@ -141,9 +142,10 @@ public interface DiscoverySpi extends IgniteSpi { /** * Sends custom message across the ring. - * @param evt Event. + * @param msg Custom message. + * @throws IgniteException if failed to marshal evt. */ - public void sendCustomEvent(Serializable evt); + public void sendCustomEvent(DiscoverySpiCustomMessage msg) throws IgniteException; /** * Initiates failure of provided node. @@ -151,4 +153,12 @@ public interface DiscoverySpi extends IgniteSpi { * @param nodeId Node ID. */ public void failNode(UUID nodeId); + + /** + * Whether or not discovery is started in client mode. + * + * @return {@code true} if node is in client mode. + * @throws IllegalStateException If discovery SPI has not started. + */ + public boolean isClientMode() throws IllegalStateException; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java new file mode 100644 index 0000000..15e943b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery; + +import org.jetbrains.annotations.*; + +import java.io.*; + +/** + * Message to send across ring. + * + * @see org.apache.ignite.internal.managers.discovery.GridDiscoveryManager#sendCustomEvent( + * org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage) + */ +public interface DiscoverySpiCustomMessage extends Serializable { + /** + * Called when message passed the ring. + */ + @Nullable public DiscoverySpiCustomMessage ackMessage(); + + /** + * @return {@code true} if message can be modified during listener notification. Changes will be send to next nodes. + */ + public boolean isMutable(); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiListener.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiListener.java index 7f17fe4..f46869d 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiListener.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiListener.java @@ -18,10 +18,9 @@ package org.apache.ignite.spi.discovery; import org.apache.ignite.cluster.*; -import org.apache.ignite.events.DiscoveryEvent; +import org.apache.ignite.events.*; import org.jetbrains.annotations.*; -import java.io.*; import java.util.*; /** @@ -47,5 +46,5 @@ public interface DiscoverySpiListener { ClusterNode node, Collection<ClusterNode> topSnapshot, @Nullable Map<Long, Collection<ClusterNode>> topHist, - @Nullable Serializable data); + @Nullable DiscoverySpiCustomMessage data); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java new file mode 100644 index 0000000..d064c8d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -0,0 +1,1478 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.util.future.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.spi.*; +import org.apache.ignite.spi.discovery.*; +import org.apache.ignite.spi.discovery.tcp.internal.*; +import org.apache.ignite.spi.discovery.tcp.messages.*; +import org.jetbrains.annotations.*; +import org.jsr166.*; + +import java.io.*; +import java.net.*; +import java.util.*; +import java.util.concurrent.*; + +import static java.util.concurrent.TimeUnit.*; +import static org.apache.ignite.events.EventType.*; +import static org.apache.ignite.internal.events.DiscoveryCustomEvent.*; + +/** + * + */ +class ClientImpl extends TcpDiscoveryImpl { + /** */ + private static final Object JOIN_TIMEOUT = "JOIN_TIMEOUT"; + + /** */ + private static final Object SPI_STOP = "SPI_STOP"; + + /** */ + private static final Object SPI_RECONNECT_FAILED = "SPI_RECONNECT_FAILED"; + + /** Remote nodes. */ + private final ConcurrentMap<UUID, TcpDiscoveryNode> rmtNodes = new ConcurrentHashMap8<>(); + + /** Topology history. */ + private final NavigableMap<Long, Collection<ClusterNode>> topHist = new TreeMap<>(); + + /** Remote nodes. */ + private final ConcurrentMap<UUID, GridFutureAdapter<Boolean>> pingFuts = new ConcurrentHashMap8<>(); + + /** Socket writer. */ + private SocketWriter sockWriter; + + /** */ + private SocketReader sockReader; + + /** */ + private boolean segmented; + + /** Last message ID. */ + private volatile IgniteUuid lastMsgId; + + /** Current topology version. */ + private volatile long topVer; + + /** Join error. Contains error what occurs on join process. */ + private IgniteSpiException joinErr; + + /** Joined latch. */ + private final CountDownLatch joinLatch = new CountDownLatch(1); + + /** Left latch. */ + private final CountDownLatch leaveLatch = new CountDownLatch(1); + + /** */ + private final Timer timer = new Timer("TcpDiscoverySpi.timer"); + + /** */ + protected MessageWorker msgWorker; + + /** + * @param adapter Adapter. + */ + ClientImpl(TcpDiscoverySpi adapter) { + super(adapter); + } + + /** {@inheritDoc} */ + @Override public void dumpDebugInfo(IgniteLogger log) { + StringBuilder b = new StringBuilder(U.nl()); + + b.append(">>>").append(U.nl()); + b.append(">>>").append("Dumping discovery SPI debug info.").append(U.nl()); + b.append(">>>").append(U.nl()); + + b.append("Local node ID: ").append(getLocalNodeId()).append(U.nl()).append(U.nl()); + b.append("Local node: ").append(locNode).append(U.nl()).append(U.nl()); + + b.append("Internal threads: ").append(U.nl()); + + b.append(" Message worker: ").append(threadStatus(msgWorker)).append(U.nl()); + b.append(" Socket reader: ").append(threadStatus(sockReader)).append(U.nl()); + b.append(" Socket writer: ").append(threadStatus(sockWriter)).append(U.nl()); + + b.append(U.nl()); + + b.append("Nodes: ").append(U.nl()); + + for (ClusterNode node : allVisibleNodes()) + b.append(" ").append(node.id()).append(U.nl()); + + b.append(U.nl()); + + b.append("Stats: ").append(spi.stats).append(U.nl()); + + U.quietAndInfo(log, b.toString()); + } + + /** {@inheritDoc} */ + @Override public String getSpiState() { + + if (sockWriter.isOnline()) + return "connected"; + + return "disconnected"; + } + + /** {@inheritDoc} */ + @Override public int getMessageWorkerQueueSize() { + return msgWorker.queueSize(); + } + + /** {@inheritDoc} */ + @Override public UUID getCoordinator() { + return null; + } + + /** {@inheritDoc} */ + @Override public void spiStart(@Nullable String gridName) throws IgniteSpiException { + spi.initLocalNode(0, true); + + locNode = spi.locNode; + + sockWriter = new SocketWriter(); + sockWriter.start(); + + sockReader = new SocketReader(); + sockReader.start(); + + msgWorker = new MessageWorker(); + msgWorker.start(); + + if (spi.ipFinder.isShared()) + registerLocalNodeAddress(); + + try { + joinLatch.await(); + + if (joinErr != null) + throw joinErr; + } + catch (InterruptedException e) { + throw new IgniteSpiException("Thread has been interrupted.", e); + } + + timer.schedule(new HeartbeatSender(), spi.hbFreq, spi.hbFreq); + + spi.printStartInfo(); + } + + /** {@inheritDoc} */ + @Override public void spiStop() throws IgniteSpiException { + timer.cancel(); + + if (msgWorker != null && msgWorker.isAlive()) { // Should always be alive + msgWorker.addMessage(SPI_STOP); + + try { + if (!leaveLatch.await(spi.netTimeout, MILLISECONDS)) + U.warn(log, "Failed to left node: timeout [nodeId=" + locNode + ']'); + } + catch (InterruptedException ignored) { + + } + } + + for (GridFutureAdapter<Boolean> fut : pingFuts.values()) + fut.onDone(false); + + rmtNodes.clear(); + + U.interrupt(msgWorker); + U.interrupt(sockWriter); + U.interrupt(sockReader); + + U.join(msgWorker, log); + U.join(sockWriter, log); + U.join(sockReader, log); + + spi.printStopInfo(); + } + + /** {@inheritDoc} */ + @Override public void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public Collection<ClusterNode> getRemoteNodes() { + return U.arrayList(rmtNodes.values(), TcpDiscoveryNodesRing.VISIBLE_NODES); + } + + /** {@inheritDoc} */ + @Nullable @Override public ClusterNode getNode(UUID nodeId) { + if (getLocalNodeId().equals(nodeId)) + return locNode; + + TcpDiscoveryNode node = rmtNodes.get(nodeId); + + return node != null && node.visible() ? node : null; + } + + /** {@inheritDoc} */ + @Override public boolean pingNode(@NotNull final UUID nodeId) { + if (nodeId.equals(getLocalNodeId())) + return true; + + TcpDiscoveryNode node = rmtNodes.get(nodeId); + + if (node == null || !node.visible()) + return false; + + GridFutureAdapter<Boolean> fut = pingFuts.get(nodeId); + + if (fut == null) { + fut = new GridFutureAdapter<>(); + + GridFutureAdapter<Boolean> oldFut = pingFuts.putIfAbsent(nodeId, fut); + + if (oldFut != null) + fut = oldFut; + else { + if (spi.getSpiContext().isStopping()) { + if (pingFuts.remove(nodeId, fut)) + fut.onDone(false); + + return false; + } + + final GridFutureAdapter<Boolean> finalFut = fut; + + timer.schedule(new TimerTask() { + @Override public void run() { + if (pingFuts.remove(nodeId, finalFut)) + finalFut.onDone(false); + } + }, spi.netTimeout); + + sockWriter.sendMessage(new TcpDiscoveryClientPingRequest(getLocalNodeId(), nodeId)); + } + } + + try { + return fut.get(); + } + catch (IgniteInterruptedCheckedException ignored) { + return false; + } + catch (IgniteCheckedException e) { + throw new IgniteSpiException(e); // Should newer occur + } + } + + /** {@inheritDoc} */ + @Override public void disconnect() throws IgniteSpiException { + U.interrupt(msgWorker); + U.interrupt(sockWriter); + U.interrupt(sockReader); + + U.join(msgWorker, log); + U.join(sockWriter, log); + U.join(sockReader, log); + + leaveLatch.countDown(); + joinLatch.countDown(); + + spi.getSpiContext().deregisterPorts(); + + Collection<ClusterNode> rmts = getRemoteNodes(); + + // This is restart/disconnection and remote nodes are not empty. + // We need to fire FAIL event for each. + DiscoverySpiListener lsnr = spi.lsnr; + + if (lsnr != null) { + for (ClusterNode n : rmts) { + rmtNodes.remove(n.id()); + + Collection<ClusterNode> top = updateTopologyHistory(topVer + 1); + + lsnr.onDiscovery(EVT_NODE_FAILED, topVer, n, top, new TreeMap<>(topHist), null); + } + } + + rmtNodes.clear(); + } + + /** {@inheritDoc} */ + @Override public void sendCustomEvent(DiscoverySpiCustomMessage evt) { + if (segmented) + throw new IgniteException("Failed to send custom message: client is disconnected"); + + try { + sockWriter.sendMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt, + spi.marsh.marshal(evt))); + } + catch (IgniteCheckedException e) { + throw new IgniteSpiException("Failed to marshal custom event: " + evt, e); + } + } + + /** {@inheritDoc} */ + @Override public void failNode(UUID nodeId) { + ClusterNode node = rmtNodes.get(nodeId); + + if (node != null) { + TcpDiscoveryNodeFailedMessage msg = new TcpDiscoveryNodeFailedMessage(getLocalNodeId(), + node.id(), node.order()); + + msgWorker.addMessage(msg); + } + } + + /** + * @return Opened socket or {@code null} if timeout. + * @see TcpDiscoverySpi#joinTimeout + */ + @SuppressWarnings("BusyWait") + @Nullable private Socket joinTopology(boolean recon) throws IgniteSpiException, InterruptedException { + Collection<InetSocketAddress> addrs = null; + + long startTime = U.currentTimeMillis(); + + while (true) { + if (Thread.currentThread().isInterrupted()) + throw new InterruptedException(); + + while (addrs == null || addrs.isEmpty()) { + addrs = spi.resolvedAddresses(); + + if (!F.isEmpty(addrs)) { + if (log.isDebugEnabled()) + log.debug("Resolved addresses from IP finder: " + addrs); + } + else { + U.warn(log, "No addresses registered in the IP finder (will retry in 2000ms): " + spi.ipFinder); + + if (spi.joinTimeout > 0 && (U.currentTimeMillis() - startTime) > spi.joinTimeout) + return null; + + Thread.sleep(2000); + } + } + + Collection<InetSocketAddress> addrs0 = new ArrayList<>(addrs); + + Iterator<InetSocketAddress> it = addrs.iterator(); + + while (it.hasNext()) { + if (Thread.currentThread().isInterrupted()) + throw new InterruptedException(); + + InetSocketAddress addr = it.next(); + + Socket sock = null; + + try { + long ts = U.currentTimeMillis(); + + IgniteBiTuple<Socket, UUID> t = initConnection(addr); + + sock = t.get1(); + + UUID rmtNodeId = t.get2(); + + spi.stats.onClientSocketInitialized(U.currentTimeMillis() - ts); + + locNode.clientRouterNodeId(rmtNodeId); + + TcpDiscoveryAbstractMessage msg = recon ? + new TcpDiscoveryClientReconnectMessage(getLocalNodeId(), rmtNodeId, + lastMsgId) : + new TcpDiscoveryJoinRequestMessage(locNode, spi.collectExchangeData(getLocalNodeId())); + + msg.client(true); + + spi.writeToSocket(sock, msg); + + int res = spi.readReceipt(sock, spi.ackTimeout); + + switch (res) { + case RES_OK: + return sock; + + case RES_CONTINUE_JOIN: + case RES_WAIT: + U.closeQuiet(sock); + + break; + + default: + if (log.isDebugEnabled()) + log.debug("Received unexpected response to join request: " + res); + + U.closeQuiet(sock); + } + } + catch (IOException | IgniteCheckedException e) { + if (log.isDebugEnabled()) + U.error(log, "Failed to establish connection with address: " + addr, e); + + U.closeQuiet(sock); + + it.remove(); + } + } + + if (addrs.isEmpty()) { + U.warn(log, "Failed to connect to any address from IP finder (will retry to join topology " + + "in 2000ms): " + addrs0); + + if (spi.joinTimeout > 0 && (U.currentTimeMillis() - startTime) > spi.joinTimeout) + return null; + + Thread.sleep(2000); + } + } + } + + /** + * @param topVer New topology version. + * @return Latest topology snapshot. + */ + private NavigableSet<ClusterNode> updateTopologyHistory(long topVer) { + this.topVer = topVer; + + NavigableSet<ClusterNode> allNodes = allVisibleNodes(); + + if (!topHist.containsKey(topVer)) { + assert topHist.isEmpty() || topHist.lastKey() == topVer - 1 : + "lastVer=" + topHist.lastKey() + ", newVer=" + topVer; + + topHist.put(topVer, allNodes); + + if (topHist.size() > spi.topHistSize) + topHist.pollFirstEntry(); + + assert topHist.lastKey() == topVer; + assert topHist.size() <= spi.topHistSize; + } + + return allNodes; + } + + /** + * @return All nodes. + */ + private NavigableSet<ClusterNode> allVisibleNodes() { + NavigableSet<ClusterNode> allNodes = new TreeSet<>(); + + for (TcpDiscoveryNode node : rmtNodes.values()) { + if (node.visible()) + allNodes.add(node); + } + + allNodes.add(locNode); + + return allNodes; + } + + /** + * @param addr Address. + * @return Remote node ID. + * @throws IOException In case of I/O error. + * @throws IgniteCheckedException In case of other error. + */ + private IgniteBiTuple<Socket, UUID> initConnection(InetSocketAddress addr) throws IOException, IgniteCheckedException { + assert addr != null; + + Socket sock = spi.openSocket(addr); + + TcpDiscoveryHandshakeRequest req = new TcpDiscoveryHandshakeRequest(getLocalNodeId()); + + req.client(true); + + spi.writeToSocket(sock, req); + + TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, spi.ackTimeout); + + UUID nodeId = res.creatorNodeId(); + + assert nodeId != null; + assert !getLocalNodeId().equals(nodeId); + + return F.t(sock, nodeId); + } + + /** {@inheritDoc} */ + @Override void simulateNodeFailure() { + U.warn(log, "Simulating client node failure: " + getLocalNodeId()); + + U.interrupt(sockWriter); + U.interrupt(msgWorker); + + U.join(sockWriter, log); + U.join(msgWorker, log); + } + + /** {@inheritDoc} */ + @Override public void brakeConnection() { + U.closeQuiet(msgWorker.currSock); + } + + /** {@inheritDoc} */ + @Override protected IgniteSpiThread workerThread() { + return msgWorker; + } + + /** + * FOR TEST PURPOSE ONLY! + */ + @SuppressWarnings("BusyWait") + public void waitForClientMessagePrecessed() { + Object last = msgWorker.queue.peekLast(); + + while (last != null && msgWorker.isAlive() && msgWorker.queue.contains(last)) { + try { + Thread.sleep(10); + } + catch (InterruptedException ignored) { + Thread.currentThread().interrupt(); + } + } + } + + /** + * Heartbeat sender. + */ + private class HeartbeatSender extends TimerTask { + /** {@inheritDoc} */ + @Override public void run() { + if (!spi.getSpiContext().isStopping() && sockWriter.isOnline()) { + TcpDiscoveryClientHeartbeatMessage msg = new TcpDiscoveryClientHeartbeatMessage(getLocalNodeId(), + spi.metricsProvider.metrics()); + + msg.client(true); + + sockWriter.sendMessage(msg); + } + } + } + + /** + * Socket reader. + */ + private class SocketReader extends IgniteSpiThread { + /** */ + private final Object mux = new Object(); + + /** */ + private Socket sock; + + /** */ + private UUID rmtNodeId; + + /** + */ + protected SocketReader() { + super(spi.ignite().name(), "tcp-client-disco-sock-reader", log); + } + + /** + * @param sock Socket. + * @param rmtNodeId Rmt node id. + */ + public void setSocket(Socket sock, UUID rmtNodeId) { + synchronized (mux) { + this.sock = sock; + + this.rmtNodeId = rmtNodeId; + + mux.notifyAll(); + } + } + + /** {@inheritDoc} */ + @Override protected void body() throws InterruptedException { + while (!isInterrupted()) { + Socket sock; + UUID rmtNodeId; + + synchronized (mux) { + if (this.sock == null) { + mux.wait(); + + continue; + } + + sock = this.sock; + rmtNodeId = this.rmtNodeId; + } + + try { + InputStream in = new BufferedInputStream(sock.getInputStream()); + + sock.setKeepAlive(true); + sock.setTcpNoDelay(true); + + while (!isInterrupted()) { + TcpDiscoveryAbstractMessage msg; + + try { + msg = spi.marsh.unmarshal(in, U.gridClassLoader()); + } + catch (IgniteCheckedException e) { + if (log.isDebugEnabled()) + U.error(log, "Failed to read message [sock=" + sock + ", " + + "locNodeId=" + getLocalNodeId() + ", rmtNodeId=" + rmtNodeId + ']', e); + + IOException ioEx = X.cause(e, IOException.class); + + if (ioEx != null) + throw ioEx; + + ClassNotFoundException clsNotFoundEx = X.cause(e, ClassNotFoundException.class); + + if (clsNotFoundEx != null) + LT.warn(log, null, "Failed to read message due to ClassNotFoundException " + + "(make sure same versions of all classes are available on all nodes) " + + "[rmtNodeId=" + rmtNodeId + ", err=" + clsNotFoundEx.getMessage() + ']'); + else + LT.error(log, e, "Failed to read message [sock=" + sock + ", locNodeId=" + + getLocalNodeId() + ", rmtNodeId=" + rmtNodeId + ']'); + + continue; + } + + msg.senderNodeId(rmtNodeId); + + if (log.isDebugEnabled()) + log.debug("Message has been received: " + msg); + + spi.stats.onMessageReceived(msg); + + if (spi.ensured(msg)) + lastMsgId = msg.id(); + + msgWorker.addMessage(msg); + } + } + catch (IOException e) { + msgWorker.addMessage(new SocketClosedMessage(sock)); + + if (log.isDebugEnabled()) + U.error(log, "Connection failed [sock=" + sock + ", locNodeId=" + getLocalNodeId() + ']', e); + } + finally { + U.closeQuiet(sock); + + synchronized (mux) { + if (this.sock == sock) { + this.sock = null; + this.rmtNodeId = null; + } + } + } + } + } + } + + /** + * + */ + private class SocketWriter extends IgniteSpiThread { + /** */ + private final Object mux = new Object(); + + /** */ + private Socket sock; + + /** */ + private final Queue<TcpDiscoveryAbstractMessage> queue = new ArrayDeque<>(); + + /** + * + */ + protected SocketWriter() { + super(spi.ignite().name(), "tcp-client-disco-sock-writer", log); + } + + /** + * @param msg Message. + */ + private void sendMessage(TcpDiscoveryAbstractMessage msg) { + synchronized (mux) { + queue.add(msg); + + mux.notifyAll(); + } + } + + /** + * @param sock Socket. + */ + private void setSocket(Socket sock) { + synchronized (mux) { + this.sock = sock; + + mux.notifyAll(); + } + } + + /** + * + */ + public boolean isOnline() { + synchronized (mux) { + return sock != null; + } + } + + /** {@inheritDoc} */ + @Override protected void body() throws InterruptedException { + TcpDiscoveryAbstractMessage msg = null; + + while (!Thread.currentThread().isInterrupted()) { + Socket sock; + + synchronized (mux) { + sock = this.sock; + + if (sock == null) { + mux.wait(); + + continue; + } + + if (msg == null) + msg = queue.poll(); + + if (msg == null) { + mux.wait(); + + continue; + } + } + + for (IgniteInClosure<TcpDiscoveryAbstractMessage> msgLsnr : spi.sendMsgLsnrs) + msgLsnr.apply(msg); + + try { + spi.writeToSocket(sock, msg); + + msg = null; + } + catch (IOException e) { + if (log.isDebugEnabled()) + U.error(log, "Failed to send node left message (will stop anyway) [sock=" + sock + ']', e); + + U.closeQuiet(sock); + + synchronized (mux) { + if (sock == this.sock) + this.sock = null; // Connection has dead. + } + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send message: " + msg, e); + + msg = null; + } + } + } + } + + /** + * + */ + private class Reconnector extends IgniteSpiThread { + /** */ + private volatile Socket sock; + + /** + * + */ + protected Reconnector() { + super(spi.ignite().name(), "tcp-client-disco-msg-worker", log); + } + + /** + * + */ + public void cancel() { + interrupt(); + + U.closeQuiet(sock); + } + + /** {@inheritDoc} */ + @Override protected void body() throws InterruptedException { + assert !segmented; + + boolean success = false; + + try { + sock = joinTopology(true); + + if (sock == null) { + U.error(log, "Failed to reconnect to cluster: timeout."); + + return; + } + + if (isInterrupted()) + throw new InterruptedException(); + + InputStream in = new BufferedInputStream(sock.getInputStream()); + + sock.setKeepAlive(true); + sock.setTcpNoDelay(true); + + // Wait for + while (!isInterrupted()) { + TcpDiscoveryAbstractMessage msg = spi.marsh.unmarshal(in, U.gridClassLoader()); + + if (msg instanceof TcpDiscoveryClientReconnectMessage) { + TcpDiscoveryClientReconnectMessage res = (TcpDiscoveryClientReconnectMessage)msg; + + if (res.creatorNodeId().equals(getLocalNodeId())) { + if (res.success()) { + msgWorker.addMessage(res); + + success = true; + } + + break; + } + } + + } + } + catch (IOException | IgniteCheckedException e) { + U.error(log, "Failed to reconnect", e); + } + finally { + if (!success) { + U.closeQuiet(sock); + + msgWorker.addMessage(SPI_RECONNECT_FAILED); + } + } + } + } + + /** + * Message worker. + */ + protected class MessageWorker extends IgniteSpiThread { + /** Message queue. */ + private final BlockingDeque<Object> queue = new LinkedBlockingDeque<>(); + + /** */ + private Socket currSock; + + /** Indicates that pending messages are currently processed. */ + private boolean pending; + + /** */ + private Reconnector reconnector; + + /** + * + */ + private MessageWorker() { + super(spi.ignite().name(), "tcp-client-disco-msg-worker", log); + } + + /** {@inheritDoc} */ + @SuppressWarnings("InfiniteLoopStatement") + @Override protected void body() throws InterruptedException { + spi.stats.onJoinStarted(); + + try { + final Socket sock = joinTopology(false); + + if (sock == null) { + joinErr = new IgniteSpiException("Join process timed out"); + + joinLatch.countDown(); + + return; + } + + currSock = sock; + + sockWriter.setSocket(sock); + + timer.schedule(new TimerTask() { + @Override public void run() { + if (joinLatch.getCount() > 0) + queue.add(JOIN_TIMEOUT); + } + }, spi.netTimeout); + + sockReader.setSocket(sock, locNode.clientRouterNodeId()); + + while (true) { + Object msg = queue.take(); + + if (msg == JOIN_TIMEOUT) { + if (joinLatch.getCount() > 0) { + joinErr = new IgniteSpiException("Join process timed out [sock=" + sock + + ", timeout=" + spi.netTimeout + ']'); + + joinLatch.countDown(); + + break; + } + } + else if (msg == SPI_STOP) { + assert spi.getSpiContext().isStopping(); + + if (currSock != null) { + TcpDiscoveryAbstractMessage leftMsg = new TcpDiscoveryNodeLeftMessage(getLocalNodeId()); + + leftMsg.client(true); + + sockWriter.sendMessage(leftMsg); + } + else + leaveLatch.countDown(); + } + else if (msg instanceof SocketClosedMessage) { + if (((SocketClosedMessage)msg).sock == currSock) { + currSock = null; + + if (joinLatch.getCount() > 0) { + joinErr = new IgniteSpiException("Failed to connect to cluster: socket closed."); + + joinLatch.countDown(); + + break; + } + else { + if (spi.getSpiContext().isStopping() || segmented) + leaveLatch.countDown(); + else { + assert reconnector == null; + + final Reconnector reconnector = new Reconnector(); + this.reconnector = reconnector; + reconnector.start(); + + timer.schedule(new TimerTask() { + @Override public void run() { + if (reconnector.isAlive()) + reconnector.cancel(); + } + }, spi.netTimeout); + } + } + } + } + else if (msg == SPI_RECONNECT_FAILED) { + if (!segmented) { + segmented = true; + + reconnector.cancel(); + reconnector.join(); + + notifyDiscovery(EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes()); + } + } + else { + TcpDiscoveryAbstractMessage discoMsg = (TcpDiscoveryAbstractMessage)msg; + + if (joinLatch.getCount() > 0) { + IgniteSpiException err = null; + + if (discoMsg instanceof TcpDiscoveryDuplicateIdMessage) + err = spi.duplicateIdError((TcpDiscoveryDuplicateIdMessage)msg); + else if (discoMsg instanceof TcpDiscoveryAuthFailedMessage) + err = spi.authenticationFailedError((TcpDiscoveryAuthFailedMessage)msg); + else if (discoMsg instanceof TcpDiscoveryCheckFailedMessage) + err = spi.checkFailedError((TcpDiscoveryCheckFailedMessage)msg); + + if (err != null) { + joinErr = err; + + joinLatch.countDown(); + + break; + } + } + + processDiscoveryMessage((TcpDiscoveryAbstractMessage)msg); + } + } + } + finally { + U.closeQuiet(currSock); + + if (joinLatch.getCount() > 0) { + // This should not occurs. + joinErr = new IgniteSpiException("Some error occurs in joinig process"); + + joinLatch.countDown(); + } + + if (reconnector != null) { + reconnector.cancel(); + + reconnector.join(); + } + } + } + + /** + * @param msg Message. + */ + protected void processDiscoveryMessage(TcpDiscoveryAbstractMessage msg) { + assert msg != null; + assert msg.verified() || msg.senderNodeId() == null; + + spi.stats.onMessageProcessingStarted(msg); + + if (msg instanceof TcpDiscoveryNodeAddedMessage) + processNodeAddedMessage((TcpDiscoveryNodeAddedMessage)msg); + else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage) + processNodeAddFinishedMessage((TcpDiscoveryNodeAddFinishedMessage)msg); + else if (msg instanceof TcpDiscoveryNodeLeftMessage) + processNodeLeftMessage((TcpDiscoveryNodeLeftMessage)msg); + else if (msg instanceof TcpDiscoveryNodeFailedMessage) + processNodeFailedMessage((TcpDiscoveryNodeFailedMessage)msg); + else if (msg instanceof TcpDiscoveryHeartbeatMessage) + processHeartbeatMessage((TcpDiscoveryHeartbeatMessage)msg); + else if (msg instanceof TcpDiscoveryClientReconnectMessage) + processClientReconnectMessage((TcpDiscoveryClientReconnectMessage)msg); + else if (msg instanceof TcpDiscoveryCustomEventMessage) + processCustomMessage((TcpDiscoveryCustomEventMessage)msg); + else if (msg instanceof TcpDiscoveryClientPingResponse) + processClientPingResponse((TcpDiscoveryClientPingResponse)msg); + else if (msg instanceof TcpDiscoveryPingRequest) + processPingRequest(); + + spi.stats.onMessageProcessingFinished(msg); + } + + /** + * @param msg Message. + */ + private void processNodeAddedMessage(TcpDiscoveryNodeAddedMessage msg) { + if (spi.getSpiContext().isStopping()) + return; + + TcpDiscoveryNode node = msg.node(); + + UUID newNodeId = node.id(); + + if (getLocalNodeId().equals(newNodeId)) { + if (joinLatch.getCount() > 0) { + Collection<TcpDiscoveryNode> top = msg.topology(); + + if (top != null) { + spi.gridStartTime = msg.gridStartTime(); + + for (TcpDiscoveryNode n : top) { + if (n.order() > 0) + n.visible(true); + + rmtNodes.put(n.id(), n); + } + + topHist.clear(); + + if (msg.topologyHistory() != null) + topHist.putAll(msg.topologyHistory()); + } + else if (log.isDebugEnabled()) + log.debug("Discarding node added message with empty topology: " + msg); + } + else if (log.isDebugEnabled()) + log.debug("Discarding node added message (this message has already been processed) " + + "[msg=" + msg + ", locNode=" + locNode + ']'); + } + else { + boolean topChanged = rmtNodes.putIfAbsent(newNodeId, node) == null; + + if (topChanged) { + if (log.isDebugEnabled()) + log.debug("Added new node to topology: " + node); + + Map<Integer, byte[]> data = msg.newNodeDiscoveryData(); + + if (data != null) + spi.onExchange(newNodeId, newNodeId, data, null); + } + } + } + + /** + * @param msg Message. + */ + private void processNodeAddFinishedMessage(TcpDiscoveryNodeAddFinishedMessage msg) { + if (spi.getSpiContext().isStopping()) + return; + + if (getLocalNodeId().equals(msg.nodeId())) { + if (joinLatch.getCount() > 0) { + Map<UUID, Map<Integer, byte[]>> dataMap = msg.clientDiscoData(); + + if (dataMap != null) { + for (Map.Entry<UUID, Map<Integer, byte[]>> entry : dataMap.entrySet()) + spi.onExchange(getLocalNodeId(), entry.getKey(), entry.getValue(), null); + } + + locNode.setAttributes(msg.clientNodeAttributes()); + locNode.visible(true); + + long topVer = msg.topologyVersion(); + + locNode.order(topVer); + + notifyDiscovery(EVT_NODE_JOINED, topVer, locNode, updateTopologyHistory(topVer)); + + joinErr = null; + + joinLatch.countDown(); + + spi.stats.onJoinFinished(); + } + else if (log.isDebugEnabled()) + log.debug("Discarding node add finished message (this message has already been processed) " + + "[msg=" + msg + ", locNode=" + locNode + ']'); + } + else { + TcpDiscoveryNode node = rmtNodes.get(msg.nodeId()); + + if (node == null) { + if (log.isDebugEnabled()) + log.debug("Discarding node add finished message since node is not found [msg=" + msg + ']'); + + return; + } + + long topVer = msg.topologyVersion(); + + node.order(topVer); + node.visible(true); + + if (spi.locNodeVer.equals(node.version())) + node.version(spi.locNodeVer); + + NavigableSet<ClusterNode> top = updateTopologyHistory(topVer); + + if (!pending && joinLatch.getCount() > 0) { + if (log.isDebugEnabled()) + log.debug("Discarding node add finished message (join process is not finished): " + msg); + + return; + } + + notifyDiscovery(EVT_NODE_JOINED, topVer, node, top); + + spi.stats.onNodeJoined(); + } + } + + /** + * @param msg Message. + */ + private void processNodeLeftMessage(TcpDiscoveryNodeLeftMessage msg) { + if (getLocalNodeId().equals(msg.creatorNodeId())) { + if (log.isDebugEnabled()) + log.debug("Received node left message for local node: " + msg); + + leaveLatch.countDown(); + } + else { + if (spi.getSpiContext().isStopping()) + return; + + TcpDiscoveryNode node = rmtNodes.remove(msg.creatorNodeId()); + + if (node == null) { + if (log.isDebugEnabled()) + log.debug("Discarding node left message since node is not found [msg=" + msg + ']'); + + return; + } + + NavigableSet<ClusterNode> top = updateTopologyHistory(msg.topologyVersion()); + + if (!pending && joinLatch.getCount() > 0) { + if (log.isDebugEnabled()) + log.debug("Discarding node left message (join process is not finished): " + msg); + + return; + } + + notifyDiscovery(EVT_NODE_LEFT, msg.topologyVersion(), node, top); + + spi.stats.onNodeLeft(); + } + } + + /** + * @param msg Message. + */ + private void processNodeFailedMessage(TcpDiscoveryNodeFailedMessage msg) { + if (spi.getSpiContext().isStopping()) { + if (!getLocalNodeId().equals(msg.creatorNodeId()) && getLocalNodeId().equals(msg.failedNodeId())) { + if (leaveLatch.getCount() > 0) { + log.debug("Remote node fail this node while node is stopping [locNode=" + getLocalNodeId() + + ", rmtNode=" + msg.creatorNodeId() + ']'); + + leaveLatch.countDown(); + } + } + + return; + } + + if (!getLocalNodeId().equals(msg.creatorNodeId())) { + TcpDiscoveryNode node = rmtNodes.remove(msg.failedNodeId()); + + if (node == null) { + if (log.isDebugEnabled()) + log.debug("Discarding node failed message since node is not found [msg=" + msg + ']'); + + return; + } + + NavigableSet<ClusterNode> top = updateTopologyHistory(msg.topologyVersion()); + + if (!pending && joinLatch.getCount() > 0) { + if (log.isDebugEnabled()) + log.debug("Discarding node failed message (join process is not finished): " + msg); + + return; + } + + notifyDiscovery(EVT_NODE_FAILED, msg.topologyVersion(), node, top); + + spi.stats.onNodeFailed(); + } + } + + /** + * @param msg Message. + */ + private void processHeartbeatMessage(TcpDiscoveryHeartbeatMessage msg) { + if (spi.getSpiContext().isStopping()) + return; + + if (getLocalNodeId().equals(msg.creatorNodeId())) { + assert msg.senderNodeId() != null; + + if (log.isDebugEnabled()) + log.debug("Received heartbeat response: " + msg); + } + else { + long tstamp = U.currentTimeMillis(); + + if (msg.hasMetrics()) { + for (Map.Entry<UUID, TcpDiscoveryHeartbeatMessage.MetricsSet> e : msg.metrics().entrySet()) { + UUID nodeId = e.getKey(); + + TcpDiscoveryHeartbeatMessage.MetricsSet metricsSet = e.getValue(); + + Map<Integer, CacheMetrics> cacheMetrics = msg.hasCacheMetrics() ? + msg.cacheMetrics().get(nodeId) : Collections.<Integer, CacheMetrics>emptyMap(); + + updateMetrics(nodeId, metricsSet.metrics(), cacheMetrics, tstamp); + + for (T2<UUID, ClusterMetrics> t : metricsSet.clientMetrics()) + updateMetrics(t.get1(), t.get2(), cacheMetrics, tstamp); + } + } + } + } + + /** + * @param msg Message. + */ + private void processClientReconnectMessage(TcpDiscoveryClientReconnectMessage msg) { + if (spi.getSpiContext().isStopping()) + return; + + if (getLocalNodeId().equals(msg.creatorNodeId())) { + assert msg.success(); + + currSock = reconnector.sock; + + sockWriter.setSocket(currSock); + sockReader.setSocket(currSock, locNode.clientRouterNodeId()); + + reconnector = null; + + pending = true; + + try { + for (TcpDiscoveryAbstractMessage pendingMsg : msg.pendingMessages()) + processDiscoveryMessage(pendingMsg); + } + finally { + pending = false; + } + } + else if (log.isDebugEnabled()) + log.debug("Discarding reconnect message for another client: " + msg); + } + + /** + * @param msg Message. + */ + private void processCustomMessage(TcpDiscoveryCustomEventMessage msg) { + if (msg.verified() && joinLatch.getCount() == 0) { + DiscoverySpiListener lsnr = spi.lsnr; + + if (lsnr != null) { + UUID nodeId = msg.creatorNodeId(); + + TcpDiscoveryNode node = nodeId.equals(getLocalNodeId()) ? locNode : rmtNodes.get(nodeId); + + if (node != null && node.visible()) { + try { + DiscoverySpiCustomMessage msgObj = msg.message(spi.marsh); + + notifyDiscovery(EVT_DISCOVERY_CUSTOM_EVT, topVer, node, allVisibleNodes(), msgObj); + } + catch (Throwable e) { + U.error(log, "Failed to unmarshal discovery custom message.", e); + } + } + else if (log.isDebugEnabled()) + log.debug("Received metrics from unknown node: " + nodeId); + } + } + } + + /** + * @param msg Message. + */ + private void processClientPingResponse(TcpDiscoveryClientPingResponse msg) { + GridFutureAdapter<Boolean> fut = pingFuts.remove(msg.nodeToPing()); + + if (fut != null) + fut.onDone(msg.result()); + } + + /** + * Router want to ping this client. + */ + private void processPingRequest() { + TcpDiscoveryPingResponse res = new TcpDiscoveryPingResponse(getLocalNodeId()); + + res.client(true); + + sockWriter.sendMessage(res); + } + + /** + * @param nodeId Node ID. + * @param metrics Metrics. + * @param cacheMetrics Cache metrics. + * @param tstamp Timestamp. + */ + private void updateMetrics(UUID nodeId, + ClusterMetrics metrics, + Map<Integer, CacheMetrics> cacheMetrics, + long tstamp) + { + assert nodeId != null; + assert metrics != null; + assert cacheMetrics != null; + + TcpDiscoveryNode node = nodeId.equals(getLocalNodeId()) ? locNode : rmtNodes.get(nodeId); + + if (node != null && node.visible()) { + node.setMetrics(metrics); + node.setCacheMetrics(cacheMetrics); + + node.lastUpdateTime(tstamp); + + notifyDiscovery(EVT_NODE_METRICS_UPDATED, topVer, node, allVisibleNodes()); + } + else if (log.isDebugEnabled()) + log.debug("Received metrics from unknown node: " + nodeId); + } + + /** + * @param type Event type. + * @param topVer Topology version. + * @param node Node. + * @param top Topology snapshot. + */ + private void notifyDiscovery(int type, long topVer, ClusterNode node, NavigableSet<ClusterNode> top) { + notifyDiscovery(type, topVer, node, top, null); + } + + /** + * @param type Event type. + * @param topVer Topology version. + * @param node Node. + * @param top Topology snapshot. + */ + private void notifyDiscovery(int type, long topVer, ClusterNode node, NavigableSet<ClusterNode> top, + @Nullable DiscoverySpiCustomMessage data) { + DiscoverySpiListener lsnr = spi.lsnr; + + if (lsnr != null) { + if (log.isDebugEnabled()) + log.debug("Discovery notification [node=" + node + ", type=" + U.gridEventName(type) + + ", topVer=" + topVer + ']'); + + lsnr.onDiscovery(type, topVer, node, top, new TreeMap<>(topHist), data); + } + else if (log.isDebugEnabled()) + log.debug("Skipped discovery notification [node=" + node + ", type=" + U.gridEventName(type) + + ", topVer=" + topVer + ']'); + } + + /** + * @param msg Message. + */ + public void addMessage(Object msg) { + queue.add(msg); + } + + /** + * + */ + public int queueSize() { + return queue.size(); + } + } + + /** + * + */ + private static class SocketClosedMessage { + /** */ + private final Socket sock; + + /** + * @param sock Socket. + */ + private SocketClosedMessage(Socket sock) { + this.sock = sock; + } + } +}