# ignite-21
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/dae4b942 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/dae4b942 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/dae4b942 Branch: refs/heads/ignite-21 Commit: dae4b9428cfe04fb2d0f78020e7f0ea29eb37c04 Parents: 20872dc Author: sboikov <sboi...@gridgain.com> Authored: Wed Dec 10 10:41:56 2014 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Wed Dec 10 13:22:50 2014 +0300 ---------------------------------------------------------------------- .../rest/protocols/tcp/GridMockNioSession.java | 11 + .../communication/tcp/TcpCommunicationSpi.java | 1151 +++++++++++++++--- .../tcp/TcpCommunicationSpiMBean.java | 52 +- .../discovery/tcp/TcpDiscoverySpiAdapter.java | 3 +- .../GridTcpCommunicationMessageAdapter.java | 7 + .../GridTcpCommunicationMessageFactory.java | 10 +- .../grid/util/nio/GridCommunicationClient.java | 3 +- .../grid/util/nio/GridNioFinishedFuture.java | 5 + .../gridgain/grid/util/nio/GridNioFuture.java | 15 +- .../grid/util/nio/GridNioFutureImpl.java | 7 +- .../util/nio/GridNioRecoveryDescriptor.java | 356 ++++++ .../gridgain/grid/util/nio/GridNioServer.java | 167 ++- .../gridgain/grid/util/nio/GridNioSession.java | 10 + .../grid/util/nio/GridNioSessionImpl.java | 10 + .../util/nio/GridSelectorNioSessionImpl.java | 97 ++ .../util/nio/GridShmemCommunicationClient.java | 4 +- .../util/nio/GridTcpCommunicationClient.java | 4 +- .../util/nio/GridTcpNioCommunicationClient.java | 39 +- .../GridAbstractCommunicationSelfTest.java | 85 +- .../spi/communication/GridTestMessage.java | 15 +- .../GridTcpCommunicationSpiAbstractTest.java | 26 +- ...mmunicationSpiConcurrentConnectSelfTest.java | 398 ++++++ .../GridTcpCommunicationSpiConfigSelfTest.java | 4 + ...cpCommunicationSpiMultithreadedSelfTest.java | 77 +- ...pCommunicationSpiMultithreadedShmemTest.java | 2 +- ...dTcpCommunicationSpiRecoveryAckSelfTest.java | 426 +++++++ ...GridTcpCommunicationSpiRecoverySelfTest.java | 713 +++++++++++ .../GridTcpCommunicationSpiShmemSelfTest.java | 2 +- ...cpCommunicationSpiTcpNoDelayOffSelfTest.java | 20 + .../managers/GridManagerStopSelfTest.java | 2 + .../grid/spi/GridSpiStartStopAbstractTest.java | 2 + .../nio/impl/GridNioFilterChainSelfTest.java | 10 + .../gridgain/testframework/GridTestNode.java | 12 +- .../GridSpiCommunicationSelfTestSuite.java | 8 +- 34 files changed, 3434 insertions(+), 319 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dae4b942/modules/clients/src/test/java/org/gridgain/grid/kernal/processors/rest/protocols/tcp/GridMockNioSession.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/gridgain/grid/kernal/processors/rest/protocols/tcp/GridMockNioSession.java b/modules/clients/src/test/java/org/gridgain/grid/kernal/processors/rest/protocols/tcp/GridMockNioSession.java index 7071ed5..bf82f22 100644 --- a/modules/clients/src/test/java/org/gridgain/grid/kernal/processors/rest/protocols/tcp/GridMockNioSession.java +++ b/modules/clients/src/test/java/org/gridgain/grid/kernal/processors/rest/protocols/tcp/GridMockNioSession.java @@ -10,6 +10,7 @@ package org.gridgain.grid.kernal.processors.rest.protocols.tcp; import org.gridgain.grid.util.lang.*; import org.gridgain.grid.util.nio.*; +import org.jetbrains.annotations.*; import java.net.*; @@ -132,4 +133,14 @@ public class GridMockNioSession extends GridMetadataAwareAdapter implements Grid @Override public boolean readsPaused() { return false; } + + /** {@inheritDoc} */ + @Override public void recoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc) { + // No-op. + } + + /** {@inheritDoc} */ + @Nullable @Override public GridNioRecoveryDescriptor recoveryDescriptor() { + return null; + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dae4b942/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index a80421e..1911b5b 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -24,6 +24,7 @@ import org.gridgain.grid.kernal.managers.eventstorage.*; import org.apache.ignite.spi.communication.*; import org.gridgain.grid.util.*; import org.gridgain.grid.util.direct.*; +import org.gridgain.grid.util.future.*; import org.gridgain.grid.util.ipc.*; import org.gridgain.grid.util.ipc.shmem.*; import org.gridgain.grid.util.lang.*; @@ -51,7 +52,7 @@ import static org.apache.ignite.events.IgniteEventType.*; * TCP/IP protocol and Java NIO to communicate with other nodes. * <p> * To enable communication with other nodes, this SPI adds {@link #ATTR_ADDRS} - * and {@link #ATTR_PORT} local node attributes (see {@link org.apache.ignite.cluster.ClusterNode#attributes()}. + * and {@link #ATTR_PORT} local node attributes (see {@link ClusterNode#attributes()}. * <p> * At startup, this SPI tries to start listening to local port specified by * {@link #setLocalPort(int)} method. If local port is occupied, then SPI will @@ -84,9 +85,7 @@ import static org.apache.ignite.events.IgniteEventType.*; * <li>Direct or heap buffer allocation for sending (see {@link #setDirectSendBuffer(boolean)})</li> * <li>Count of selectors and selector threads for NIO server (see {@link #setSelectorsCount(int)})</li> * <li>{@code TCP_NODELAY} socket option for sockets (see {@link #setTcpNoDelay(boolean)})</li> - * <li>Async message sending (see {@link #setAsyncSend(boolean)})</li> * <li>Message queue limit (see {@link #setMessageQueueLimit(int)})</li> - * <li>Dual socket connection (see {@link #setDualSocketConnection(boolean)})</li> * <li>Minimum buffered message count (see {@link #setMinimumBufferedMessageCount(int)})</li> * <li>Buffer size ratio (see {@link #setBufferSizeRatio(double)})</li> * <li>Connect timeout (see {@link #setConnectTimeout(long)})</li> @@ -95,6 +94,9 @@ import static org.apache.ignite.events.IgniteEventType.*; * <li>Local port to accept shared memory connections (see {@link #setSharedMemoryPort(int)})</li> * <li>Socket receive buffer size (see {@link #setSocketReceiveBuffer(int)})</li> * <li>Socket send buffer size (see {@link #setSocketSendBuffer(int)})</li> + * <li>Socket write timeout (see {@link #setSocketWriteTimeout(long)})</li> + * <li>Number of received messages after which acknowledgment is sent (see {@link #setAckSendThreshold(int)})</li> + * <li>Maximum number of unacknowledged messages (see {@link #setUnacknowledgedMessagesBufferSize(int)})</li> * </ul> * <h2 class="header">Java Example</h2> * GridTcpCommunicationSpi is used by default and should be explicitly configured @@ -131,7 +133,7 @@ import static org.apache.ignite.events.IgniteEventType.*; * <img src="http://www.gridgain.com/images/spring-small.png"> * <br> * For information about Spring framework visit <a href="http://www.springframework.org/">www.springframework.org</a> - * @see org.apache.ignite.spi.communication.CommunicationSpi + * @see CommunicationSpi */ @IgniteSpiMultipleInstancesSupport(true) @IgniteSpiConsistencyChecked(optional = false) @@ -187,9 +189,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** Default message queue limit per connection (for incoming and outgoing . */ public static final int DFLT_MSG_QUEUE_LIMIT = GridNioServer.DFLT_SEND_QUEUE_LIMIT; - /** Default value for dualSocketConnection flag. */ - public static final boolean DFLT_DUAL_SOCKET_CONNECTION = false; - /** * Default count of selectors for TCP server equals to * {@code "Math.min(4, Runtime.getRuntime().availableProcessors())"}. @@ -211,6 +210,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** Default value for {@code TCP_NODELAY} socket option (value is <tt>true</tt>). */ public static final boolean DFLT_TCP_NODELAY = true; + /** Default received messages threshold for sending ack. */ + public static final int DFLT_ACK_SND_THRESHOLD = 512; + + /** Default socket write timeout. */ + public static final long DFLT_SOCK_WRITE_TIMEOUT = GridNioServer.DFLT_SES_WRITE_TIMEOUT; + /** No-op runnable. */ private static final IgniteRunnable NOOP = new IgniteRunnable() { @Override public void run() { @@ -221,9 +226,27 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** Node ID message type. */ public static final byte NODE_ID_MSG_TYPE = -1; + /** */ + public static final byte RECOVERY_LAST_ID_MSG_TYPE = -2; + + /** */ + public static final byte HANDSHAKE_MSG_TYPE = -3; + /** Server listener. */ private final GridNioServerListener<GridTcpCommunicationMessageAdapter> srvLsnr = new GridNioServerListenerAdapter<GridTcpCommunicationMessageAdapter>() { + @Override public void onSessionWriteTimeout(GridNioSession ses) { + LT.warn(log, null, "Communication SPI Session write timed out (consider increasing " + + "'socketWriteTimeout' " + "configuration property) [remoteAddr=" + ses.remoteAddress() + + ", writeTimeout=" + sockWriteTimeout + ']'); + + if (log.isDebugEnabled()) + log.debug("Closing communication SPI session on write timeout [remoteAddr=" + ses.remoteAddress() + + ", writeTimeout=" + sockWriteTimeout + ']'); + + ses.close(); + } + @Override public void onConnected(GridNioSession ses) { if (ses.accepted()) { if (log.isDebugEnabled()) @@ -231,8 +254,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter ses.send(nodeIdMsg); } - else - assert asyncSnd; } @Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) { @@ -243,9 +264,28 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (rmv instanceof GridTcpNioCommunicationClient && ((GridTcpNioCommunicationClient)rmv).session() == ses && - clients.remove(id, rmv)) + clients.remove(id, rmv)) { rmv.forceClose(); + if (!stopping) { + GridNioRecoveryDescriptor recoveryData = ses.recoveryDescriptor(); + + if (recoveryData != null) { + if (recoveryData.nodeAlive(getSpiContext().node(id))) { + if (!recoveryData.messagesFutures().isEmpty()) { + if (log.isDebugEnabled()) + log.debug("Session was closed but there are unacknowledged messages, " + + "will try to reconnect [rmtNode=" + recoveryData.node().id() + ']'); + + recoveryWorker.addReconnectRequest(recoveryData); + } + } + else + recoveryData.onNodeLeft(); + } + } + } + CommunicationListener<GridTcpCommunicationMessageAdapter> lsnr0 = lsnr; if (lsnr0 != null) @@ -259,20 +299,22 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (sndId == null) { assert ses.accepted(); - assert msg instanceof NodeIdMessage; + if (msg instanceof NodeIdMessage) + sndId = U.bytesToUuid(((NodeIdMessage)msg).nodeIdBytes, 0); + else { + assert msg instanceof HandshakeMessage : msg; - sndId = U.bytesToUuid(((NodeIdMessage)msg).nodeIdBytes, 0); + sndId = ((HandshakeMessage)msg).nodeId(); + } if (log.isDebugEnabled()) log.debug("Remote node ID received: " + sndId); - UUID old = ses.addMeta(NODE_ID_META, sndId); + final UUID old = ses.addMeta(NODE_ID_META, sndId); assert old == null; - IgniteProductVersion locVer = getSpiContext().localNode().version(); - - ClusterNode rmtNode = getSpiContext().node(sndId); + final ClusterNode rmtNode = getSpiContext().node(sndId); if (rmtNode == null) { ses.close(); @@ -280,25 +322,110 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter return; } + ClusterNode locNode = getSpiContext().localNode(); + + IgniteProductVersion locVer = locNode.version(); + IgniteProductVersion rmtVer = rmtNode.version(); if (!locVer.equals(rmtVer)) ses.addMeta(GridNioServer.DIFF_VER_NODE_ID_META_KEY, sndId); - if (asyncSnd && ses.remoteAddress() != null && !dualSockConn) { - Object sync = locks.tryLock(sndId); + if (ses.remoteAddress() == null) + return; + + GridCommunicationClient oldClient = clients.get(sndId); + + boolean hasShmemClient = false; + + if (oldClient != null) { + if (oldClient instanceof GridTcpNioCommunicationClient) { + if (log.isDebugEnabled()) + log.debug("Received incoming connection when already connected " + + "to this node, rejecting [locNode=" + locNode.id() + + ", rmtNode=" + sndId + ']'); + + ses.send(new RecoveryLastReceivedMessage(-1)); + + return; + } + else { + assert oldClient instanceof GridShmemCommunicationClient; + + hasShmemClient = true; + } + } + + GridFutureAdapterEx<GridCommunicationClient> fut = new GridFutureAdapterEx<>(); + + GridFutureAdapterEx<GridCommunicationClient> oldFut = clientFuts.putIfAbsent(sndId, fut); + + assert msg instanceof HandshakeMessage : msg; + + HandshakeMessage msg0 = (HandshakeMessage)msg; + + final GridNioRecoveryDescriptor recoveryDesc = recoveryDescriptor(rmtNode); + + if (oldFut == null) { + oldClient = clients.get(sndId); + + if (oldClient != null) { + if (oldClient instanceof GridTcpNioCommunicationClient) { + if (log.isDebugEnabled()) + log.debug("Received incoming connection when already connected " + + "to this node, rejecting [locNode=" + locNode.id() + + ", rmtNode=" + sndId + ']'); + + ses.send(new RecoveryLastReceivedMessage(-1)); + + return; + } + else { + assert oldClient instanceof GridShmemCommunicationClient; + + hasShmemClient = true; + } + } + + boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(), + new ConnectClosure(ses, recoveryDesc, rmtNode, msg0, !hasShmemClient, fut)); - if (sync != null) { + if (log.isDebugEnabled()) + log.debug("Received incoming connection from remote node " + + "[rmtNode=" + rmtNode.id() + ", reserved=" + reserved + ']'); + + if (reserved) { try { - if (clients.get(sndId) == null) { - if (log.isDebugEnabled()) - log.debug("Will reuse session for node: " + sndId); + GridTcpNioCommunicationClient client = + connected(recoveryDesc, ses, rmtNode, msg0.received(), true, !hasShmemClient); - clients.put(sndId, new GridTcpNioCommunicationClient(ses)); - } + fut.onDone(client); } finally { - locks.unlock(sndId, sync); + clientFuts.remove(rmtNode.id(), fut); + } + } + } + else { + if (oldFut instanceof ConnectFuture && locNode.order() < rmtNode.order()) { + if (log.isDebugEnabled()) { + log.debug("Received incoming connection from remote node while " + + "connecting to this node, rejecting [locNode=" + locNode.id() + + ", locNodeOrder=" + locNode.order() + ", rmtNode=" + rmtNode.id() + + ", rmtNodeOrder=" + rmtNode.order() + ']'); + } + + ses.send(new RecoveryLastReceivedMessage(-1)); + } + else { + boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(), + new ConnectClosure(ses, recoveryDesc, rmtNode, msg0, !hasShmemClient, fut)); + + if (reserved) { + GridTcpNioCommunicationClient client = + connected(recoveryDesc, ses, rmtNode, msg0.received(), true, !hasShmemClient); + + fut.onDone(client); } } } @@ -306,6 +433,35 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter else { rcvdMsgsCnt.increment(); + GridNioRecoveryDescriptor recovery = ses.recoveryDescriptor(); + + if (recovery != null) { + if (msg instanceof RecoveryLastReceivedMessage) { + RecoveryLastReceivedMessage msg0 = (RecoveryLastReceivedMessage)msg; + + if (log.isDebugEnabled()) + log.debug("Received recovery acknowledgement [rmtNode=" + sndId + + ", rcvCnt=" + msg0.received() + ']'); + + recovery.ackReceived(msg0.received()); + + return; + } + else { + long rcvCnt = recovery.onReceived(); + + if (rcvCnt % ackSndThreshold == 0) { + if (log.isDebugEnabled()) + log.debug("Send recovery acknowledgement [rmtNode=" + sndId + + ", rcvCnt=" + rcvCnt + ']'); + + nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(rcvCnt)); + + recovery.lastAcknowledged(rcvCnt); + } + } + } + IgniteRunnable c; if (msgQueueLimit > 0) { @@ -328,6 +484,135 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter notifyListener(sndId, msg, c); } } + + /** + * @param recovery Recovery descriptor. + * @param ses Session. + * @param node Node. + * @param rcvCnt Number of received messages.. + * @param sndRes If {@code true} sends response for recovery handshake. + * @param createClient If {@code true} creates NIO communication client. + * @return Client. + */ + private GridTcpNioCommunicationClient connected( + GridNioRecoveryDescriptor recovery, + GridNioSession ses, + ClusterNode node, + long rcvCnt, + boolean sndRes, + boolean createClient) { + recovery.onHandshake(rcvCnt); + + ses.recoveryDescriptor(recovery); + + nioSrvr.resend(ses); + + if (sndRes) + nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recovery.receivedCount())); + + recovery.connected(); + + GridTcpNioCommunicationClient client = null; + + if (createClient) { + client = new GridTcpNioCommunicationClient(ses, log); + + GridCommunicationClient oldClient = clients.putIfAbsent(node.id(), client); + + assert oldClient == null; + } + + return client; + } + + /** + * + */ + @SuppressWarnings("PackageVisibleInnerClass") + class ConnectClosure implements IgniteInClosure<Boolean> { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final GridNioSession ses; + + /** */ + private final GridNioRecoveryDescriptor recoveryDesc; + + /** */ + private final ClusterNode rmtNode; + + /** */ + private final HandshakeMessage msg; + + /** */ + private final GridFutureAdapterEx<GridCommunicationClient> fut; + + /** */ + private final boolean createClient; + + /** + * @param ses Incoming session. + * @param recoveryDesc Recovery descriptor. + * @param rmtNode Remote node. + * @param msg Handshake message. + * @param createClient If {@code true} creates NIO communication client.. + * @param fut Connect future. + */ + ConnectClosure(GridNioSession ses, + GridNioRecoveryDescriptor recoveryDesc, + ClusterNode rmtNode, + HandshakeMessage msg, + boolean createClient, + GridFutureAdapterEx<GridCommunicationClient> fut) { + this.ses = ses; + this.recoveryDesc = recoveryDesc; + this.rmtNode = rmtNode; + this.msg = msg; + this.createClient = createClient; + this.fut = fut; + } + + /** {@inheritDoc} */ + @Override public void apply(Boolean success) { + if (success) { + IgniteInClosure<GridNioFuture<?>> lsnr = new IgniteInClosure<GridNioFuture<?>>() { + @Override public void apply(GridNioFuture<?> msgFut) { + try { + msgFut.get(); + + GridTcpNioCommunicationClient client = + connected(recoveryDesc, ses, rmtNode, msg.received(), false, createClient); + + fut.onDone(client); + } + catch (GridException | IOException e) { + if (log.isDebugEnabled()) + log.debug("Failed to send recovery handshake " + + "[rmtNode=" + rmtNode.id() + ", err=" + e + ']'); + + recoveryDesc.release(); + + fut.onDone(); + } + finally { + clientFuts.remove(rmtNode.id(), fut); + } + } + }; + + nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recoveryDesc.receivedCount()), lsnr); + } + else { + try { + fut.onDone(); + } + finally { + clientFuts.remove(rmtNode.id(), fut); + } + } + } + } }; /** Logger. */ @@ -402,9 +687,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** Buffer size ratio. */ private double bufSizeRatio = IgniteSystemProperties.getDouble(GG_COMMUNICATION_BUF_RESIZE_RATIO, 0.8); - /** Dual socket connection flag. */ - private boolean dualSockConn = DFLT_DUAL_SOCKET_CONNECTION; - /** NIO server. */ private GridNioServer<GridTcpCommunicationMessageAdapter> nioSrvr; @@ -414,8 +696,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** {@code TCP_NODELAY} option value for created sockets. */ private boolean tcpNoDelay = DFLT_TCP_NODELAY; - /** Use async client flag. */ - private boolean asyncSnd = true; + /** Number of received messages after which acknowledgment is sent. */ + private int ackSndThreshold = DFLT_ACK_SND_THRESHOLD; + + /** Maximum number of unacknowledged messages. */ + private int unackedMsgsBufSize; + + /** Socket write timeout. */ + private long sockWriteTimeout = DFLT_SOCK_WRITE_TIMEOUT; /** Shared memory accept worker. */ private ShmemAcceptWorker shmemAcceptWorker; @@ -429,6 +717,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** Socket timeout worker. */ private SocketTimeoutWorker sockTimeoutWorker; + /** Recovery worker. */ + private RecoveryWorker recoveryWorker; + /** Shared memory workers. */ private final Collection<ShmemWorker> shmemWorkers = new ConcurrentLinkedDeque8<>(); @@ -468,6 +759,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** Context initialization latch. */ private final CountDownLatch ctxInitLatch = new CountDownLatch(1); + /** Stopping flag. */ + private volatile boolean stopping; + /** metrics listener. */ private final GridNioMetricsListener metricsLsnr = new GridNioMetricsListener() { @Override public void onBytesSent(int bytesCnt) { @@ -479,8 +773,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } }; - /** Locks. */ - private final GridKeyLock locks = new GridKeyLock(); + /** Client connect futures. */ + private final ConcurrentMap<UUID, GridFutureAdapterEx<GridCommunicationClient>> clientFuts = + GridConcurrentFactory.newMap(); + + /** */ + private final ConcurrentMap<ClientKey, GridNioRecoveryDescriptor> recoveryDescs = GridConcurrentFactory.newMap(); /** Discovery listener. */ private final GridLocalEventListener discoLsnr = new GridLocalEventListener() { @@ -696,6 +994,58 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter return idleConnTimeout; } + /** {@inheritDoc} */ + @Override public long getSocketWriteTimeout() { + return sockWriteTimeout; + } + + /** + * Sets socket write timeout for TCP connection. If message can not be written to + * socket within this time then connection is closed and reconnect is attempted. + * <p> + * Default to {@link #DFLT_SOCK_WRITE_TIMEOUT}. + * + * @param sockWriteTimeout Socket write timeout for TCP connection. + */ + @IgniteSpiConfiguration(optional = true) + public void setSocketWriteTimeout(long sockWriteTimeout) { + this.sockWriteTimeout = sockWriteTimeout; + } + + /** {@inheritDoc} */ + @Override public int getAckSendThreshold() { + return ackSndThreshold; + } + + /** + * Sets number of received messages per connection to node after which acknowledgment message is sent. + * <p> + * Default to {@link #DFLT_ACK_SND_THRESHOLD}. + * + * @param ackSndThreshold Number of received messages after which acknowledgment is sent. + */ + @IgniteSpiConfiguration(optional = true) + public void setAckSendThreshold(int ackSndThreshold) { + this.ackSndThreshold = ackSndThreshold; + } + + /** {@inheritDoc} */ + @Override public int getUnacknowledgedMessagesBufferSize() { + return unackedMsgsBufSize; + } + + /** + * Sets maximum number of stored unacknowledged messages per connection to node. + * If number of unacknowledged messages exceeds this number then connection to node is + * closed and reconnect is attempted. + * + * @param unackedMsgsBufSize Maximum number of unacknowledged messages. + */ + @IgniteSpiConfiguration(optional = true) + public void setUnacknowledgedMessagesBufferSize(int unackedMsgsBufSize) { + this.unackedMsgsBufSize = unackedMsgsBufSize; + } + /** * Sets connection buffer size. If set to {@code 0} connection buffer is disabled. * <p> @@ -838,24 +1188,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter return selectorsCnt; } - /** {@inheritDoc} */ - @Override public boolean isAsyncSend() { - return asyncSnd; - } - - /** - * Sets flag defining whether asynchronous (NIO) or synchronous (blocking) IO - * should be used to send messages. - * <p> - * If not provided, default value is {@code true}. - * - * @param asyncSnd {@code True} if asynchronous IO should be used to send messages. - */ - @IgniteSpiConfiguration(optional = true) - public void setAsyncSend(boolean asyncSnd) { - this.asyncSnd = asyncSnd; - } - /** * Sets value for {@code TCP_NODELAY} socket option. Each * socket will be opened using provided value. @@ -915,33 +1247,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } /** - * Sets flag indicating whether dual-socket connection between nodes should be enforced. If set to - * {@code true}, two separate connections will be established between communicating nodes: one for outgoing - * messages, and one for incoming. When set to {@code false}, single {@code TCP} connection will be used - * for both directions. - * <p> - * This flag is useful on some operating systems, when {@code TCP_NODELAY} flag is disabled and - * messages take too long to get delivered. - * <p> - * If not provided, default is {@code false}. - * - * @param dualSockConn Whether dual-socket connection should be enforced. - */ - @IgniteSpiConfiguration(optional = true) - public void setDualSocketConnection(boolean dualSockConn) { - this.dualSockConn = dualSockConn; - } - - /** {@inheritDoc} */ - @Override public boolean isDualSocketConnection() { - return dualSockConn; - } - - /** * Sets message queue limit for incoming and outgoing messages. * <p> - * This parameter only used when {@link #isAsyncSend()} set to {@code true}. - * <p> * When set to positive number send queue is limited to the configured value. * {@code 0} disables the size limitations. * <p> @@ -963,7 +1270,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter * Sets the minimum number of messages for this SPI, that are buffered * prior to sending. * <p> - * Defaults to either {@code 512} or {@link org.apache.ignite.IgniteSystemProperties#GG_MIN_BUFFERED_COMMUNICATION_MSG_CNT} + * Defaults to either {@code 512} or {@link IgniteSystemProperties#GG_MIN_BUFFERED_COMMUNICATION_MSG_CNT} * system property (if specified). * * @param minBufferedMsgCnt Minimum buffered message count. @@ -982,7 +1289,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter * Sets the buffer size ratio for this SPI. As messages are sent, * the buffer size is adjusted using this ratio. * <p> - * Defaults to either {@code 0.8} or {@link org.apache.ignite.IgniteSystemProperties#GG_COMMUNICATION_BUF_RESIZE_RATIO} + * Defaults to either {@code 0.8} or {@link IgniteSystemProperties#GG_COMMUNICATION_BUF_RESIZE_RATIO} * system property (if specified). * * @param bufSizeRatio Buffer size ratio. @@ -1064,6 +1371,17 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter assertParameter(bufSizeRatio > 0 && bufSizeRatio < 1, "bufSizeRatio > 0 && bufSizeRatio < 1"); assertParameter(connTimeout >= 0, "connTimeout >= 0"); assertParameter(maxConnTimeout >= connTimeout, "maxConnTimeout >= connTimeout"); + assertParameter(sockWriteTimeout >= 0, "sockWriteTimeout >= 0"); + assertParameter(ackSndThreshold > 0, "ackSndThreshold > 0"); + assertParameter(unackedMsgsBufSize >= 0, "unackedMsgsBufSize >= 0"); + + if (unackedMsgsBufSize > 0) { + assertParameter(unackedMsgsBufSize >= msgQueueLimit * 5, + "Specified 'unackedMsgsBufSize' is too low, it should be at least 'msgQueueLimit * 5'."); + + assertParameter(unackedMsgsBufSize >= ackSndThreshold * 5, + "Specified 'unackedMsgsBufSize' is too low, it should be at least 'ackSndThreshold * 5'."); + } try { locHost = U.resolveLocalHost(locAddr); @@ -1125,18 +1443,19 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter log.debug(configInfo("connBufSize", connBufSize)); log.debug(configInfo("connBufFlushFreq", connBufFlushFreq)); log.debug(configInfo("selectorsCnt", selectorsCnt)); - log.debug(configInfo("asyncSend", asyncSnd)); log.debug(configInfo("tcpNoDelay", tcpNoDelay)); log.debug(configInfo("sockSndBuf", sockSndBuf)); log.debug(configInfo("sockRcvBuf", sockRcvBuf)); log.debug(configInfo("shmemPort", shmemPort)); log.debug(configInfo("msgQueueLimit", msgQueueLimit)); - log.debug(configInfo("dualSockConn", dualSockConn)); log.debug(configInfo("minBufferedMsgCnt", minBufferedMsgCnt)); log.debug(configInfo("bufSizeRatio", bufSizeRatio)); log.debug(configInfo("connTimeout", connTimeout)); log.debug(configInfo("maxConnTimeout", maxConnTimeout)); log.debug(configInfo("reconCnt", reconCnt)); + log.debug(configInfo("sockWriteTimeout", sockWriteTimeout)); + log.debug(configInfo("ackSndThreshold", ackSndThreshold)); + log.debug(configInfo("unackedMsgsBufSize", unackedMsgsBufSize)); } if (connBufSize > 8192) @@ -1162,6 +1481,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter idleClientWorker.start(); + recoveryWorker = new RecoveryWorker(); + + recoveryWorker.start(); + if (connBufSize > 0) { clientFlushWorker = new ClientFlushWorker(); @@ -1242,6 +1565,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter .directMode(true) .metricsListener(metricsLsnr) .messageWriter(msgWriter) + .writeTimeout(sockWriteTimeout) .filters(new GridNioCodecFilter(new GridDirectParser(msgReader, this), log, true), new GridConnectionBytesVerifyFilter(log)) .build(); @@ -1321,6 +1645,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** {@inheritDoc} */ @Override public void spiStop() throws IgniteSpiException { + assert stopping; + unregisterMBean(); // Stop TCP server. @@ -1333,10 +1659,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter U.interrupt(idleClientWorker); U.interrupt(clientFlushWorker); U.interrupt(sockTimeoutWorker); + U.interrupt(recoveryWorker); U.join(idleClientWorker, log); U.join(clientFlushWorker, log); U.join(sockTimeoutWorker, log); + U.join(recoveryWorker, log); U.cancel(shmemWorkers); U.join(shmemWorkers, log); @@ -1360,6 +1688,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** {@inheritDoc} */ @Override protected void onContextDestroyed0() { + stopping = true; + if (ctxInitLatch.getCount() > 0) // Safety. ctxInitLatch.countDown(); @@ -1428,20 +1758,33 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter GridCommunicationClient client = null; try { - client = reserveClient(node); + boolean retry; + + do { + client = reserveClient(node); - UUID nodeId = null; + UUID nodeId = null; - if (!client.async() && !getSpiContext().localNode().version().equals(node.version())) - nodeId = node.id(); + if (!client.async() && !getSpiContext().localNode().version().equals(node.version())) + nodeId = node.id(); - client.sendMessage(nodeId, msg); + retry = client.sendMessage(nodeId, msg); - client.release(); + client.release(); - client = null; + client = null; + + if (!retry) + sentMsgsCnt.increment(); + else { + ClusterNode node0 = getSpiContext().node(node.id()); - sentMsgsCnt.increment(); + if (node0 == null) + throw new GridException("Failed to send message to remote node " + + "(node has left the grid): " + node.id()); + } + } + while (retry); } catch (GridException e) { throw new IgniteSpiException("Failed to send message to remote node: " + node, e); @@ -1469,23 +1812,49 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter GridCommunicationClient client = clients.get(nodeId); if (client == null) { + if (stopping) + throw new IgniteSpiException("Grid is stopping."); + // Do not allow concurrent connects. - Object sync = locks.lock(nodeId); + GridFutureAdapterEx<GridCommunicationClient> fut = new ConnectFuture(); - try { - client = clients.get(nodeId); + GridFutureAdapterEx<GridCommunicationClient> oldFut = clientFuts.putIfAbsent(nodeId, fut); + + if (oldFut == null) { + try { + GridCommunicationClient client0 = clients.get(nodeId); + + if (client0 == null) { + client0 = createNioClient(node); + + if (client0 != null) { + GridCommunicationClient old = clients.put(nodeId, client0); + + assert old == null; + } + else + U.sleep(200); + } - if (client == null) { - GridCommunicationClient old = clients.put(nodeId, client = createNioClient(node)); + fut.onDone(client0); + } + catch (Throwable e) { + fut.onDone(e); - assert old == null; + if (e instanceof Error) + throw (Error)e; + } + finally { + clientFuts.remove(nodeId, fut); } } - finally { - locks.unlock(nodeId, sync); - } + else + fut = oldFut; - assert client != null; + client = fut.get(); + + if (client == null) + continue; if (getSpiContext().node(nodeId) == null) { if (clients.remove(nodeId, client)) @@ -1570,7 +1939,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } try { - safeHandshake(client, node.id(), connTimeout0); + safeHandshake(client, null, node.id(), connTimeout0); } catch (HandshakeTimeoutException e) { if (log.isDebugEnabled()) @@ -1661,59 +2030,74 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter while (!conn) { // Reconnection on handshake timeout. try { - if (asyncSnd) { - SocketChannel ch = SocketChannel.open(); + SocketChannel ch = SocketChannel.open(); - ch.configureBlocking(true); + ch.configureBlocking(true); - ch.socket().setTcpNoDelay(tcpNoDelay); - ch.socket().setKeepAlive(true); + ch.socket().setTcpNoDelay(tcpNoDelay); + ch.socket().setKeepAlive(true); - if (sockRcvBuf > 0) - ch.socket().setReceiveBufferSize(sockRcvBuf); + if (sockRcvBuf > 0) + ch.socket().setReceiveBufferSize(sockRcvBuf); - if (sockSndBuf > 0) - ch.socket().setSendBufferSize(sockSndBuf); + if (sockSndBuf > 0) + ch.socket().setSendBufferSize(sockSndBuf); - ch.socket().connect(addr, (int)connTimeout); + GridNioRecoveryDescriptor recoveryDesc = recoveryDescriptor(node); - safeHandshake(ch, node.id(), connTimeout0); + if (!recoveryDesc.reserve()) { + U.closeQuiet(ch); - UUID diffVerNodeId = null; + return null; + } - IgniteProductVersion locVer = getSpiContext().localNode().version(); - IgniteProductVersion rmtVer = node.version(); + long rcvCnt = -1; - if (!locVer.equals(rmtVer)) - diffVerNodeId = node.id(); + try { + ch.socket().connect(addr, (int)connTimeout); - GridNioSession ses = nioSrvr.createSession( - ch, - F.asMap( - NODE_ID_META, node.id(), - GridNioServer.DIFF_VER_NODE_ID_META_KEY, diffVerNodeId) - ).get(); + rcvCnt = safeHandshake(ch, recoveryDesc, node.id(), connTimeout0); - client = new GridTcpNioCommunicationClient(ses); + if (rcvCnt == -1) + return null; } - else { - client = new GridTcpCommunicationClient( - metricsLsnr, - msgWriter, - addr, - locHost, - connTimeout, - tcpNoDelay, - sockRcvBuf, - sockSndBuf, - connBufSize, - minBufferedMsgCnt, - bufSizeRatio); - - safeHandshake(client, node.id(), connTimeout0); + finally { + if (recoveryDesc != null && rcvCnt == -1) + recoveryDesc.release(); } - conn = true; + UUID diffVerNodeId = null; + + IgniteProductVersion locVer = getSpiContext().localNode().version(); + IgniteProductVersion rmtVer = node.version(); + + if (!locVer.equals(rmtVer)) + diffVerNodeId = node.id(); + + try { + Map<Integer, Object> meta = new HashMap<>(); + + meta.put(NODE_ID_META, node.id()); + meta.put(GridNioServer.DIFF_VER_NODE_ID_META_KEY, diffVerNodeId); + + if (recoveryDesc != null) { + recoveryDesc.onHandshake(rcvCnt); + + meta.put(-1, recoveryDesc); + } + + GridNioSession ses = nioSrvr.createSession(ch, meta).get(); + + client = new GridTcpNioCommunicationClient(ses, log); + + conn = true; + } + finally { + if (!conn) { + if (recoveryDesc != null) + recoveryDesc.release(); + } + } } catch (HandshakeTimeoutException e) { if (client != null) { @@ -1812,16 +2196,25 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter * Performs handshake in timeout-safe way. * * @param client Client. + * @param recovery Recovery descriptor if use recovery handshake, otherwise {@code null}. * @param rmtNodeId Remote node. * @param timeout Timeout for handshake. * @throws GridException If handshake failed or wasn't completed withing timeout. + * @return Handshake response. */ @SuppressWarnings("ThrowFromFinallyBlock") - private <T> void safeHandshake(T client, UUID rmtNodeId, long timeout) throws GridException { + private <T> long safeHandshake( + T client, + @Nullable GridNioRecoveryDescriptor recovery, + UUID rmtNodeId, + long timeout + ) throws GridException { HandshakeTimeoutObject<T> obj = new HandshakeTimeoutObject<>(client, U.currentTimeMillis() + timeout); sockTimeoutWorker.addTimeoutObject(obj); + long rcvCnt = 0; + try { if (client instanceof GridCommunicationClient) ((GridCommunicationClient)client).doHandshake(new HandshakeClosure(rmtNodeId)); @@ -1851,9 +2244,62 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter log.debug("Received remote node ID: " + rmtNodeId0); ch.write(ByteBuffer.wrap(U.GG_HEADER)); - ch.write(ByteBuffer.wrap(nodeIdMsg.nodeIdBytesWithType)); - success = true; + if (recovery != null) { + HandshakeMessage msg = new HandshakeMessage(locNodeId, + recovery.incrementConnectCount(), + recovery.receivedCount()); + + if (log.isDebugEnabled()) + log.debug("Write handshake message [rmtNode=" + rmtNodeId + ", msg=" + msg + ']'); + + buf = ByteBuffer.allocate(33); + + buf.order(ByteOrder.nativeOrder()); + + boolean written = msg.writeTo(buf); + + assert written; + + buf.flip(); + + ch.write(buf); + } + else + ch.write(ByteBuffer.wrap(nodeIdMsg.nodeIdBytesWithType)); + + if (recovery != null) { + if (log.isDebugEnabled()) + log.debug("Waiting for handshake [rmtNode=" + rmtNodeId + ']'); + + buf = ByteBuffer.allocate(9); + + buf.order(ByteOrder.nativeOrder()); + + for (int i = 0; i < 9; ) { + int read = ch.read(buf); + + if (read == -1) + throw new GridException("Failed to read remote node recovery handshake " + + "(connection closed)."); + + i += read; + } + + rcvCnt = buf.getLong(1); + + if (log.isDebugEnabled()) + log.debug("Received handshake message [rmtNode=" + rmtNodeId + ", rcvCnt=" + rcvCnt + ']'); + + if (rcvCnt == -1) { + if (log.isDebugEnabled()) + log.debug("Connection rejected, will retry client creation [rmtNode=" + rmtNodeId + ']'); + } + else + success = true; + } + else + success = true; } catch (IOException e) { if (log.isDebugEnabled()) @@ -1878,6 +2324,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter throw new HandshakeTimeoutException("Failed to perform handshake due to timeout (consider increasing " + "'connectionTimeout' configuration property)."); } + + return rcvCnt; } /** @@ -1896,11 +2344,83 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter "is node stopping?) [senderNodeId=" + sndId + ", msg=" + msg + ']'); } + /** + * @param node Node. + * @return Recovery receive data for given node. + */ + private GridNioRecoveryDescriptor recoveryDescriptor(ClusterNode node) { + ClientKey id = new ClientKey(node.id(), node.order()); + + GridNioRecoveryDescriptor recovery = recoveryDescs.get(id); + + if (recovery == null) { + int maxSize = Math.max(msgQueueLimit, ackSndThreshold); + + int queueLimit = unackedMsgsBufSize != 0 ? unackedMsgsBufSize : (maxSize * 5); + + GridNioRecoveryDescriptor old = + recoveryDescs.put(id, recovery = new GridNioRecoveryDescriptor(queueLimit, node, log)); + + if (old != null) + recovery = old; + } + + return recovery; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(TcpCommunicationSpi.class, this); } + /** + * + */ + private static class ClientKey { + /** */ + private UUID nodeId; + + /** */ + private long order; + + /** + * @param nodeId Node ID. + * @param order Node order. + */ + private ClientKey(UUID nodeId, long order) { + this.nodeId = nodeId; + this.order = order; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object obj) { + if (this == obj) + return true; + + if (obj == null || getClass() != obj.getClass()) + return false; + + ClientKey other = (ClientKey)obj; + + return order == other.order && nodeId.equals(other.nodeId); + + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = nodeId.hashCode(); + + res = 31 * res + (int)(order ^ (order >>> 32)); + + return res; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ClientKey.class, this); + } + } + /** Internal exception class for proper timeout handling. */ private static class HandshakeTimeoutException extends GridException { /** */ @@ -2032,12 +2552,16 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter @SuppressWarnings({"BusyWait"}) @Override protected void body() throws InterruptedException { while (!isInterrupted()) { + cleanupRecovery(); + for (Map.Entry<UUID, GridCommunicationClient> e : clients.entrySet()) { UUID nodeId = e.getKey(); GridCommunicationClient client = e.getValue(); - if (getSpiContext().node(nodeId) == null) { + ClusterNode node = getSpiContext().node(nodeId); + + if (node == null) { if (log.isDebugEnabled()) log.debug("Forcing close of non-existent node connection: " + nodeId); @@ -2048,9 +2572,39 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter continue; } + GridNioRecoveryDescriptor recovery = null; + + if (client instanceof GridTcpNioCommunicationClient) { + recovery = recoveryDescs.get(new ClientKey(node.id(), node.order())); + + if (recovery != null && recovery.lastAcknowledged() != recovery.received()) { + RecoveryLastReceivedMessage msg = new RecoveryLastReceivedMessage(recovery.received()); + + if (log.isDebugEnabled()) + log.debug("Send recovery acknowledgement on timeout [rmtNode=" + nodeId + + ", rcvCnt=" + msg.received() + ']'); + + nioSrvr.sendSystem(((GridTcpNioCommunicationClient)client).session(), msg); + + recovery.lastAcknowledged(msg.received()); + + continue; + } + } + long idleTime = client.getIdleTime(); if (idleTime >= idleConnTimeout) { + if (recovery != null && + recovery.nodeAlive(getSpiContext().node(nodeId)) && + !recovery.messagesFutures().isEmpty()) { + if (log.isDebugEnabled()) + log.debug("Node connection is idle, but there are unacknowledged messages, " + + "will wait: " + nodeId); + + continue; + } + if (log.isDebugEnabled()) log.debug("Closing idle node connection: " + nodeId); @@ -2062,6 +2616,38 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter Thread.sleep(idleConnTimeout); } } + + /** + * + */ + private void cleanupRecovery() { + Set<ClientKey> left = null; + + for (Map.Entry<ClientKey, GridNioRecoveryDescriptor> e : recoveryDescs.entrySet()) { + if (left != null && left.contains(e.getKey())) + continue; + + GridNioRecoveryDescriptor recoverySnd = e.getValue(); + + if (!recoverySnd.nodeAlive(getSpiContext().node(recoverySnd.node().id()))) { + if (left == null) + left = new HashSet<>(); + + left.add(e.getKey()); + } + } + + if (left != null) { + assert !left.isEmpty(); + + for (ClientKey id : left) { + GridNioRecoveryDescriptor recoverySnd = recoveryDescs.remove(id); + + if (recoverySnd != null) + recoverySnd.onNodeLeft(); + } + } + } } /** @@ -2213,6 +2799,84 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** * */ + private class RecoveryWorker extends IgniteSpiThread { + /** */ + private final BlockingQueue<GridNioRecoveryDescriptor> q = new LinkedBlockingQueue<>(); + + /** + * + */ + private RecoveryWorker() { + super(gridName, "tcp-comm-recovery-worker", log); + } + + /** {@inheritDoc} */ + @Override protected void body() throws InterruptedException { + if (log.isDebugEnabled()) + log.debug("Recovery worker has been started."); + + while (!isInterrupted()) { + GridNioRecoveryDescriptor recoveryDesc = q.take(); + + assert recoveryDesc != null; + + ClusterNode node = recoveryDesc.node(); + + if (clients.containsKey(node.id()) || !recoveryDesc.nodeAlive(getSpiContext().node(node.id()))) + continue; + + try { + if (log.isDebugEnabled()) + log.debug("Recovery reconnect [rmtNode=" + recoveryDesc.node().id() + ']'); + + GridCommunicationClient client = reserveClient(node); + + client.release(); + } + catch (GridException e) { + if (recoveryDesc.nodeAlive(getSpiContext().node(node.id()))) { + if (log.isDebugEnabled()) + log.debug("Recovery reconnect failed, will retry " + + "[rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']'); + + addReconnectRequest(recoveryDesc); + } + else if (log.isDebugEnabled()) + log.debug("Recovery reconnect failed, " + + "node left [rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']'); + + } + } + } + + /** + * @param recoverySnd Recovery send data. + */ + void addReconnectRequest(GridNioRecoveryDescriptor recoverySnd) { + boolean add = q.add(recoverySnd); + + assert add; + } + } + + /** + * + */ + private static class ConnectFuture extends GridFutureAdapterEx<GridCommunicationClient> { + /** */ + private static final long serialVersionUID = 0L; + + /** + * Empty constructor required for {@link Externalizable}. + */ + public ConnectFuture() { + // No-op. + } + } + + /** + * + */ private static class HandshakeTimeoutObject<T> { /** */ private static final AtomicLong idGen = new AtomicLong(); @@ -2355,6 +3019,206 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } /** + * Handshake message. + */ + @SuppressWarnings("PublicInnerClass") + public static class HandshakeMessage extends GridTcpCommunicationMessageAdapter { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private UUID nodeId; + + /** */ + private long rcvCnt; + + /** */ + private long connectCnt; + + /** + * Default constructor required by {@link GridTcpCommunicationMessageAdapter}. + */ + public HandshakeMessage() { + // No-op. + } + + /** + * @param nodeId Node ID. + * @param connectCnt Connect count. + * @param rcvCnt Number of received messages. + */ + public HandshakeMessage(UUID nodeId, long connectCnt, long rcvCnt) { + assert nodeId != null; + assert rcvCnt >= 0 : rcvCnt; + + this.nodeId = nodeId; + this.connectCnt = connectCnt; + this.rcvCnt = rcvCnt; + } + + /** + * @return Connect count. + */ + public long connectCount() { + return connectCnt; + } + + /** + * @return Number of received messages. + */ + public long received() { + return rcvCnt; + } + + /** + * @return Node ID. + */ + public UUID nodeId() { + return nodeId; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf) { + if (buf.remaining() < 33) + return false; + + buf.put(HANDSHAKE_MSG_TYPE); + + byte[] bytes = U.uuidToBytes(nodeId); + + assert bytes.length == 16 : bytes.length; + + buf.put(bytes); + + buf.putLong(rcvCnt); + + buf.putLong(connectCnt); + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf) { + if (buf.remaining() < 32) + return false; + + byte[] nodeIdBytes = new byte[16]; + + buf.get(nodeIdBytes); + + nodeId = U.bytesToUuid(nodeIdBytes, 0); + + rcvCnt = buf.getLong(); + + connectCnt = buf.getLong(); + + return true; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return HANDSHAKE_MSG_TYPE; + } + + /** {@inheritDoc} */ + @SuppressWarnings("CloneDoesntCallSuperClone") + @Override public GridTcpCommunicationMessageAdapter clone() { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override protected void clone0(GridTcpCommunicationMessageAdapter msg) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(HandshakeMessage.class, this); + } + } + + /** + * Recovery acknowledgment message. + */ + @SuppressWarnings("PublicInnerClass") + public static class RecoveryLastReceivedMessage extends GridTcpCommunicationMessageAdapter { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private long rcvCnt; + + /** + * Default constructor required by {@link GridTcpCommunicationMessageAdapter}. + */ + public RecoveryLastReceivedMessage() { + // No-op. + } + + /** + * @param rcvCnt Number of received messages. + */ + public RecoveryLastReceivedMessage(long rcvCnt) { + this.rcvCnt = rcvCnt; + } + + /** + * @return Number of received messages. + */ + public long received() { + return rcvCnt; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf) { + if (buf.remaining() < 9) + return false; + + buf.put(RECOVERY_LAST_ID_MSG_TYPE); + + buf.putLong(rcvCnt); + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf) { + if (buf.remaining() < 8) + return false; + + rcvCnt = buf.getLong(); + + return true; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return RECOVERY_LAST_ID_MSG_TYPE; + } + + /** {@inheritDoc} */ + @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"}) + @Override public GridTcpCommunicationMessageAdapter clone() { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override protected void clone0(GridTcpCommunicationMessageAdapter msg) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public boolean skipRecovery() { + return true; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(RecoveryLastReceivedMessage.class, this); + } + } + + /** * Node ID message. */ @SuppressWarnings("PublicInnerClass") @@ -2426,5 +3290,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { // No-op. } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(NodeIdMessage.class, this); + } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dae4b942/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java index b35e735..1255925 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java @@ -70,15 +70,6 @@ public interface TcpCommunicationSpiMBean extends IgniteSpiManagementMBean { public int getSelectorsCount(); /** - * Gets flag defining whether asynchronous (NIO) or synchronous (blocking) IO - * should be used to send messages. - * - * @return {@code True} if asynchronous IO should be used to send messages. - */ - @IgniteMBeanDescription("Asynchronous send.") - public boolean isAsyncSend(); - - /** * Gets sent messages count. * * @return Sent messages count. @@ -220,23 +211,7 @@ public interface TcpCommunicationSpiMBean extends IgniteSpiManagementMBean { public int getSocketSendBuffer(); /** - * Gets flag indicating whether dual-socket connection between nodes should be enforced. If set to - * {@code true}, two separate connections will be established between communicating nodes: one for outgoing - * messages, and one for incoming. When set to {@code false}, single {@code TCP} connection will be used - * for both directions. - * <p> - * This flag is useful on some operating systems, when {@code TCP_NODELAY} flag is disabled and - * messages take too long to get delivered. - * - * @return Whether dual-socket connection should be enforced. - */ - @IgniteMBeanDescription("Dual-socket connection.") - public boolean isDualSocketConnection(); - - /** * Gets message queue limit for incoming and outgoing messages. - * <p> - * This parameter only used when {@link #isAsyncSend()} set to {@code true}. * * @return Send queue size limit. */ @@ -260,4 +235,31 @@ public interface TcpCommunicationSpiMBean extends IgniteSpiManagementMBean { */ @IgniteMBeanDescription("Buffer size ratio.") public double getBufferSizeRatio(); + + /** + * Gets socket write timeout for TCP connections. If message can not be written to + * socket within this time then connection is closed and reconnect is attempted. + * + * @return Socket write timeout for TCP connections. + */ + @IgniteMBeanDescription("Socket write timeout.") + public long getSocketWriteTimeout(); + + /** + * Gets number of received messages per connection to node after which acknowledgment message is sent. + * + * @return Number of received messages after which acknowledgment is sent. + */ + @IgniteMBeanDescription("Number of received messages after which acknowledgment is sent.") + public int getAckSendThreshold(); + + /** + * Gets maximum number of stored unacknowledged messages per connection to node. + * If number of unacknowledged messages exceeds this number then connection to node is + * closed and reconnect is attempted. + * + * @return Maximum number of unacknowledged messages. + */ + @IgniteMBeanDescription("Maximum number of unacknowledged messages.") + public int getUnacknowledgedMessagesBufferSize(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dae4b942/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java index 5377e18..ffa7b3d 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java @@ -310,7 +310,8 @@ abstract class TcpDiscoverySpiAdapter extends IgniteSpiAdapter implements Discov @Override protected void onContextDestroyed0() { super.onContextDestroyed0(); - ipFinder.onSpiContextDestroyed(); + if (ipFinder != null) + ipFinder.onSpiContextDestroyed(); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dae4b942/modules/core/src/main/java/org/gridgain/grid/util/direct/GridTcpCommunicationMessageAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/util/direct/GridTcpCommunicationMessageAdapter.java b/modules/core/src/main/java/org/gridgain/grid/util/direct/GridTcpCommunicationMessageAdapter.java index 8c479ff..af63a2f 100644 --- a/modules/core/src/main/java/org/gridgain/grid/util/direct/GridTcpCommunicationMessageAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/util/direct/GridTcpCommunicationMessageAdapter.java @@ -168,6 +168,13 @@ public abstract class GridTcpCommunicationMessageAdapter implements Serializable protected abstract void clone0(GridTcpCommunicationMessageAdapter _msg); /** + * @return {@code True} if should skip recovery for this message. + */ + public boolean skipRecovery() { + return false; + } + + /** * @param arr Array. * @return Array iterator. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dae4b942/modules/core/src/main/java/org/gridgain/grid/util/direct/GridTcpCommunicationMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/util/direct/GridTcpCommunicationMessageFactory.java b/modules/core/src/main/java/org/gridgain/grid/util/direct/GridTcpCommunicationMessageFactory.java index 460c827..7c92065 100644 --- a/modules/core/src/main/java/org/gridgain/grid/util/direct/GridTcpCommunicationMessageFactory.java +++ b/modules/core/src/main/java/org/gridgain/grid/util/direct/GridTcpCommunicationMessageFactory.java @@ -282,8 +282,14 @@ public class GridTcpCommunicationMessageFactory { * @return New message. */ public static GridTcpCommunicationMessageAdapter create(byte type) { - return type == TcpCommunicationSpi.NODE_ID_MSG_TYPE ? new TcpCommunicationSpi.NodeIdMessage() : - create0(type); + if (type == TcpCommunicationSpi.NODE_ID_MSG_TYPE) + return new TcpCommunicationSpi.NodeIdMessage(); + else if (type == TcpCommunicationSpi.RECOVERY_LAST_ID_MSG_TYPE) + return new TcpCommunicationSpi.RecoveryLastReceivedMessage(); + else if (type == TcpCommunicationSpi.HANDSHAKE_MSG_TYPE) + return new TcpCommunicationSpi.HandshakeMessage(); + else + return create0(type); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dae4b942/modules/core/src/main/java/org/gridgain/grid/util/nio/GridCommunicationClient.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/util/nio/GridCommunicationClient.java b/modules/core/src/main/java/org/gridgain/grid/util/nio/GridCommunicationClient.java index 4128475..5b0db53 100644 --- a/modules/core/src/main/java/org/gridgain/grid/util/nio/GridCommunicationClient.java +++ b/modules/core/src/main/java/org/gridgain/grid/util/nio/GridCommunicationClient.java @@ -87,8 +87,9 @@ public interface GridCommunicationClient { * @param nodeId Node ID (provided only if versions of local and remote nodes are different). * @param msg Message to send. * @throws GridException If failed. + * @return {@code True} if should try to resend message. */ - void sendMessage(@Nullable UUID nodeId, GridTcpCommunicationMessageAdapter msg) throws GridException; + boolean sendMessage(@Nullable UUID nodeId, GridTcpCommunicationMessageAdapter msg) throws GridException; /** * @param timeout Timeout. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dae4b942/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioFinishedFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioFinishedFuture.java b/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioFinishedFuture.java index 980b548..d6fcb60 100644 --- a/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioFinishedFuture.java +++ b/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioFinishedFuture.java @@ -99,4 +99,9 @@ public class GridNioFinishedFuture<R> implements GridNioFuture<R> { @Override public boolean messageThread() { return msgThread; } + + /** {@inheritDoc} */ + @Override public boolean skipRecovery() { + return true; + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dae4b942/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioFuture.java b/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioFuture.java index 2775c55..a3ab1ef 100644 --- a/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioFuture.java +++ b/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioFuture.java @@ -28,7 +28,7 @@ public interface GridNioFuture<R> { * * @return Operation result. * @throws GridInterruptedException Subclass of {@link GridException} thrown if the wait was interrupted. - * @throws org.apache.ignite.lang.IgniteFutureCancelledException Subclass of {@link GridException} throws if operation was cancelled. + * @throws IgniteFutureCancelledException Subclass of {@link GridException} throws if operation was cancelled. * @throws GridException If operation failed. * @throws IOException If IOException occurred while performing operation. */ @@ -42,8 +42,8 @@ public interface GridNioFuture<R> { * @param timeout The maximum time to wait in milliseconds. * @return Operation result. * @throws GridInterruptedException Subclass of {@link GridException} thrown if the wait was interrupted. - * @throws org.apache.ignite.lang.IgniteFutureTimeoutException Subclass of {@link GridException} thrown if the wait was timed out. - * @throws org.apache.ignite.lang.IgniteFutureCancelledException Subclass of {@link GridException} throws if operation was cancelled. + * @throws IgniteFutureTimeoutException Subclass of {@link GridException} thrown if the wait was timed out. + * @throws IgniteFutureCancelledException Subclass of {@link GridException} throws if operation was cancelled. * @throws GridException If operation failed. * @throws IOException If IOException occurred while performing operation. */ @@ -57,8 +57,8 @@ public interface GridNioFuture<R> { * @param unit The time unit of the {@code timeout} argument. * @return Operation result. * @throws GridInterruptedException Subclass of {@link GridException} thrown if the wait was interrupted. - * @throws org.apache.ignite.lang.IgniteFutureTimeoutException Subclass of {@link GridException} thrown if the wait was timed out. - * @throws org.apache.ignite.lang.IgniteFutureCancelledException Subclass of {@link GridException} throws if operation was cancelled. + * @throws IgniteFutureTimeoutException Subclass of {@link GridException} thrown if the wait was timed out. + * @throws IgniteFutureCancelledException Subclass of {@link GridException} throws if operation was cancelled. * @throws GridException If operation failed. * @throws IOException If IOException occurred while performing operation. */ @@ -104,4 +104,9 @@ public interface GridNioFuture<R> { * @return {@code True} if future was created in thread that was processing message. */ public boolean messageThread(); + + /** + * @return {@code True} if skip recovery for this operation. + */ + public boolean skipRecovery(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dae4b942/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioFutureImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioFutureImpl.java b/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioFutureImpl.java index 3d5c4e9..7ab2e14 100644 --- a/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioFutureImpl.java +++ b/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioFutureImpl.java @@ -118,7 +118,7 @@ public class GridNioFutureImpl<R> extends AbstractQueuedSynchronizer implements * @param nanosTimeout Timeout (nanoseconds). * @return Result. * @throws InterruptedException If interrupted. - * @throws org.apache.ignite.lang.IgniteFutureTimeoutException If timeout reached before computation completed. + * @throws IgniteFutureTimeoutException If timeout reached before computation completed. * @throws GridException If error occurred. */ @Nullable protected R get0(long nanosTimeout) throws InterruptedException, GridException { @@ -307,6 +307,11 @@ public class GridNioFutureImpl<R> extends AbstractQueuedSynchronizer implements } /** {@inheritDoc} */ + @Override public boolean skipRecovery() { + return false; + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridNioFutureImpl.class, this); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dae4b942/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioRecoveryDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioRecoveryDescriptor.java b/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioRecoveryDescriptor.java new file mode 100644 index 0000000..846a388 --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioRecoveryDescriptor.java @@ -0,0 +1,356 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.util.nio; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.lang.*; +import org.gridgain.grid.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; + +/** + * Recovery information for single node. + */ +public class GridNioRecoveryDescriptor { + /** Number of acknowledged messages. */ + private long acked; + + /** Unacknowledged message futures. */ + private final ArrayDeque<GridNioFuture<?>> msgFuts; + + /** Number of messages to resend. */ + private int resendCnt; + + /** Number of received messages. */ + private long rcvCnt; + + /** Reserved flag. */ + private boolean reserved; + + /** Last acknowledged message. */ + private long lastAck; + + /** Node left flag. */ + private boolean nodeLeft; + + /** Target node. */ + private final ClusterNode node; + + /** Logger. */ + private final IgniteLogger log; + + /** Incoming connection request from remote node. */ + private IgniteBiTuple<Long, IgniteInClosure<Boolean>> handshakeReq; + + /** Connected flag. */ + private boolean connected; + + /** Number of outgoing connect attempts. */ + private long connectCnt; + + /** Maximum size of unacknowledged messages queue. */ + private final int queueLimit; + + /** + * @param queueLimit Maximum size of unacknowledged messages queue. + * @param node Node. + * @param log Logger. + */ + public GridNioRecoveryDescriptor(int queueLimit, ClusterNode node, IgniteLogger log) { + assert !node.isLocal() : node; + assert queueLimit > 0; + + msgFuts = new ArrayDeque<>(queueLimit); + + this.queueLimit = queueLimit; + this.node = node; + this.log = log; + } + + /** + * @return Connect count. + */ + public long incrementConnectCount() { + return connectCnt++; + } + + /** + * @return Node. + */ + public ClusterNode node() { + return node; + } + + /** + * Increments received messages counter. + * + * @return Number of received messages. + */ + public long onReceived() { + rcvCnt++; + + return rcvCnt; + } + + /** + * @return Number of received messages. + */ + public long received() { + return rcvCnt; + } + + /** + * @param lastAck Last acknowledged message. + */ + public void lastAcknowledged(long lastAck) { + this.lastAck = lastAck; + } + + /** + * @return Last acknowledged message. + */ + public long lastAcknowledged() { + return lastAck; + } + + /** + * @return Received messages count. + */ + public long receivedCount() { + return rcvCnt; + } + + /** + * @return Maximum size of unacknowledged messages queue. + */ + public int queueLimit() { + return queueLimit; + } + + /** + * @param fut NIO future. + * @return {@code False} if queue limit is exceeded. + */ + public boolean add(GridNioFuture<?> fut) { + assert fut != null; + + if (!fut.skipRecovery()) { + if (resendCnt == 0) { + msgFuts.addLast(fut); + + return msgFuts.size() < queueLimit; + } + else + resendCnt--; + } + + return true; + } + + /** + * @param rcvCnt Number of messages received by remote node. + */ + public void ackReceived(long rcvCnt) { + if (log.isDebugEnabled()) + log.debug("Handle acknowledgment [acked=" + acked + ", rcvCnt=" + rcvCnt + + ", msgFuts=" + msgFuts.size() + ']'); + + while (acked < rcvCnt) { + GridNioFuture<?> fut = msgFuts.pollFirst(); + + assert fut != null; + + ((GridNioFutureImpl)fut).onDone(); + + acked++; + } + } + + /** + * Node left callback. + */ + public void onNodeLeft() { + GridNioFuture<?>[] futs = null; + + synchronized (this) { + nodeLeft = true; + + if (!reserved && !msgFuts.isEmpty()) { + futs = msgFuts.toArray(new GridNioFuture<?>[msgFuts.size()]); + + msgFuts.clear(); + } + } + + if (futs != null) + completeOnNodeLeft(futs); + } + + /** + * @return Message futures for unacknowledged messages. + */ + public Deque<GridNioFuture<?>> messagesFutures() { + return msgFuts; + } + + /** + * @param node Node. + * @return {@code True} if node is not null and has the same order as initial remtoe node. + */ + public boolean nodeAlive(@Nullable ClusterNode node) { + return node != null && node.order() == this.node.order(); + } + + /** + * @throws InterruptedException If interrupted. + * @return {@code True} if reserved. + */ + public boolean reserve() throws InterruptedException { + synchronized (this) { + while (!connected && reserved) + wait(); + + if (!connected) + reserved = true; + + return !connected; + } + } + + /** + * @param rcvCnt Number of messages received by remote node. + */ + public void onHandshake(long rcvCnt) { + ackReceived(rcvCnt); + + resendCnt = msgFuts.size(); + } + + /** + * + */ + public void connected() { + synchronized (this) { + assert reserved; + assert !connected; + + connected = true; + + if (handshakeReq != null) { + IgniteInClosure<Boolean> c = handshakeReq.get2(); + + assert c != null; + + c.apply(false); + + handshakeReq = null; + } + + notifyAll(); + } + } + + /** + * + */ + public void release() { + GridNioFuture<?>[] futs = null; + + synchronized (this) { + connected = false; + + if (handshakeReq != null) { + IgniteInClosure<Boolean> c = handshakeReq.get2(); + + assert c != null; + + handshakeReq = null; + + c.apply(true); + } + else { + reserved = false; + + notifyAll(); + } + + if (nodeLeft && !msgFuts.isEmpty()) { + futs = msgFuts.toArray(new GridNioFuture<?>[msgFuts.size()]); + + msgFuts.clear(); + } + } + + if (futs != null) + completeOnNodeLeft(futs); + } + + /** + * @param id Handshake ID. + * @param c Closure to run on reserve. + * @return {@code True} if reserved. + */ + public boolean tryReserve(long id, IgniteInClosure<Boolean> c) { + synchronized (this) { + if (connected) { + c.apply(false); + + return false; + } + + if (reserved) { + if (handshakeReq != null) { + assert handshakeReq.get1() != null; + + long id0 = handshakeReq.get1(); + + assert id0 != id : id0; + + if (id > id0) { + IgniteInClosure<Boolean> c0 = handshakeReq.get2(); + + assert c0 != null; + + c0.apply(false); + + handshakeReq = new IgniteBiTuple<>(id, c); + } + else + c.apply(false); + } + else + handshakeReq = new IgniteBiTuple<>(id, c); + + return false; + } + else { + reserved = true; + + return true; + } + } + } + + /** + * @param futs Futures to complete. + */ + private void completeOnNodeLeft(GridNioFuture<?>[] futs) { + for (GridNioFuture<?> msg : futs) + ((GridNioFutureImpl)msg).onDone(new IOException("Failed to send message, node has left: " + node.id())); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridNioRecoveryDescriptor.class, this); + } +}