# IGNITE-499 Refactoring: convert anoni,ouse to inner.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/22dcbb34 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/22dcbb34 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/22dcbb34 Branch: refs/heads/ignite-499_1 Commit: 22dcbb3490d891cccc447e5520d4d7e17d269968 Parents: 3a2d5c2 Author: sevdokimov <sevdoki...@gridgain.com> Authored: Fri Apr 17 14:41:05 2015 +0300 Committer: sevdokimov <sevdoki...@gridgain.com> Committed: Fri Apr 17 14:41:05 2015 +0300 ---------------------------------------------------------------------- .../communication/tcp/TcpCommunicationSpi.java | 718 ++++++++++--------- 1 file changed, 363 insertions(+), 355 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/22dcbb34/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 8312638..98db27d 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -221,361 +221,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter public static final byte HANDSHAKE_MSG_TYPE = -3; /** Server listener. */ - private final GridNioServerListener<Message> srvLsnr = - new GridNioServerListenerAdapter<Message>() { - @Override public void onSessionWriteTimeout(GridNioSession ses) { - LT.warn(log, null, "Communication SPI Session write timed out (consider increasing " + - "'socketWriteTimeout' " + "configuration property) [remoteAddr=" + ses.remoteAddress() + - ", writeTimeout=" + sockWriteTimeout + ']'); - - if (log.isDebugEnabled()) - log.debug("Closing communication SPI session on write timeout [remoteAddr=" + ses.remoteAddress() + - ", writeTimeout=" + sockWriteTimeout + ']'); - - ses.close(); - } - - @Override public void onConnected(GridNioSession ses) { - if (ses.accepted()) { - if (log.isDebugEnabled()) - log.debug("Sending local node ID to newly accepted session: " + ses); - - ses.send(nodeIdMsg); - } - } - - @Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) { - UUID id = ses.meta(NODE_ID_META); - - if (id != null) { - GridCommunicationClient rmv = clients.get(id); - - if (rmv instanceof GridTcpNioCommunicationClient && - ((GridTcpNioCommunicationClient)rmv).session() == ses && - clients.remove(id, rmv)) { - rmv.forceClose(); - - if (!isNodeStopping()) { - GridNioRecoveryDescriptor recoveryData = ses.recoveryDescriptor(); - - if (!getSpiContext().tryFailNode(id)) { - if (recoveryData != null) { - if (recoveryData.nodeAlive(getSpiContext().node(id))) { - if (!recoveryData.messagesFutures().isEmpty()) { - if (log.isDebugEnabled()) - log.debug("Session was closed but there are unacknowledged messages, " + - "will try to reconnect [rmtNode=" + recoveryData.node().id() + ']'); - - recoveryWorker.addReconnectRequest(recoveryData); - } - } - else - recoveryData.onNodeLeft(); - } - } - else - recoveryData.onNodeLeft(); - } - } - - CommunicationListener<Message> lsnr0 = lsnr; - - if (lsnr0 != null) - lsnr0.onDisconnected(id); - } - } - - @Override public void onMessage(GridNioSession ses, Message msg) { - UUID sndId = ses.meta(NODE_ID_META); - - if (sndId == null) { - assert ses.accepted(); - - if (msg instanceof NodeIdMessage) - sndId = U.bytesToUuid(((NodeIdMessage) msg).nodeIdBytes, 0); - else { - assert msg instanceof HandshakeMessage : msg; - - sndId = ((HandshakeMessage)msg).nodeId(); - } - - if (log.isDebugEnabled()) - log.debug("Remote node ID received: " + sndId); - - final UUID old = ses.addMeta(NODE_ID_META, sndId); - - assert old == null; - - final ClusterNode rmtNode = getSpiContext().node(sndId); - - if (rmtNode == null) { - ses.close(); - - return; - } - - ClusterNode locNode = getSpiContext().localNode(); - - if (ses.remoteAddress() == null) - return; - - GridCommunicationClient oldClient = clients.get(sndId); - - if (oldClient != null) { - if (oldClient instanceof GridTcpNioCommunicationClient) { - if (log.isDebugEnabled()) - log.debug("Received incoming connection when already connected " + - "to this node, rejecting [locNode=" + locNode.id() + - ", rmtNode=" + sndId + ']'); - - ses.send(new RecoveryLastReceivedMessage(-1)); - - return; - } - } - - GridFutureAdapter<GridCommunicationClient> fut = new GridFutureAdapter<>(); - - GridFutureAdapter<GridCommunicationClient> oldFut = clientFuts.putIfAbsent(sndId, fut); - - assert msg instanceof HandshakeMessage : msg; - - HandshakeMessage msg0 = (HandshakeMessage)msg; - - final GridNioRecoveryDescriptor recoveryDesc = recoveryDescriptor(rmtNode); - - if (oldFut == null) { - oldClient = clients.get(sndId); - - if (oldClient != null) { - if (oldClient instanceof GridTcpNioCommunicationClient) { - if (log.isDebugEnabled()) - log.debug("Received incoming connection when already connected " + - "to this node, rejecting [locNode=" + locNode.id() + - ", rmtNode=" + sndId + ']'); - - ses.send(new RecoveryLastReceivedMessage(-1)); - - return; - } - } - - boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(), - new ConnectClosure(ses, recoveryDesc, rmtNode, msg0, fut)); - - if (log.isDebugEnabled()) - log.debug("Received incoming connection from remote node " + - "[rmtNode=" + rmtNode.id() + ", reserved=" + reserved + ']'); - - if (reserved) { - try { - GridTcpNioCommunicationClient client = - connected(recoveryDesc, ses, rmtNode, msg0.received(), true); - - fut.onDone(client); - } - finally { - clientFuts.remove(rmtNode.id(), fut); - } - } - } - else { - if (oldFut instanceof ConnectFuture && locNode.order() < rmtNode.order()) { - if (log.isDebugEnabled()) { - log.debug("Received incoming connection from remote node while " + - "connecting to this node, rejecting [locNode=" + locNode.id() + - ", locNodeOrder=" + locNode.order() + ", rmtNode=" + rmtNode.id() + - ", rmtNodeOrder=" + rmtNode.order() + ']'); - } - - ses.send(new RecoveryLastReceivedMessage(-1)); - } - else { - boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(), - new ConnectClosure(ses, recoveryDesc, rmtNode, msg0, fut)); - - if (reserved) { - GridTcpNioCommunicationClient client = - connected(recoveryDesc, ses, rmtNode, msg0.received(), true); - - fut.onDone(client); - } - } - } - } - else { - rcvdMsgsCnt.increment(); - - GridNioRecoveryDescriptor recovery = ses.recoveryDescriptor(); - - if (recovery != null) { - if (msg instanceof RecoveryLastReceivedMessage) { - RecoveryLastReceivedMessage msg0 = (RecoveryLastReceivedMessage)msg; - - if (log.isDebugEnabled()) - log.debug("Received recovery acknowledgement [rmtNode=" + sndId + - ", rcvCnt=" + msg0.received() + ']'); - - recovery.ackReceived(msg0.received()); - - return; - } - else { - long rcvCnt = recovery.onReceived(); - - if (rcvCnt % ackSndThreshold == 0) { - if (log.isDebugEnabled()) - log.debug("Send recovery acknowledgement [rmtNode=" + sndId + - ", rcvCnt=" + rcvCnt + ']'); - - nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(rcvCnt)); - - recovery.lastAcknowledged(rcvCnt); - } - } - } - - IgniteRunnable c; - - if (msgQueueLimit > 0) { - GridNioMessageTracker tracker = ses.meta(TRACKER_META); - - if (tracker == null) { - GridNioMessageTracker old = ses.addMeta(TRACKER_META, tracker = - new GridNioMessageTracker(ses, msgQueueLimit)); - - assert old == null; - } - - tracker.onMessageReceived(); - - c = tracker; - } - else - c = NOOP; - - notifyListener(sndId, msg, c); - } - } - - /** - * @param recovery Recovery descriptor. - * @param ses Session. - * @param node Node. - * @param rcvCnt Number of received messages.. - * @param sndRes If {@code true} sends response for recovery handshake. - * @return Client. - */ - private GridTcpNioCommunicationClient connected( - GridNioRecoveryDescriptor recovery, - GridNioSession ses, - ClusterNode node, - long rcvCnt, - boolean sndRes) { - recovery.onHandshake(rcvCnt); - - ses.recoveryDescriptor(recovery); - - nioSrvr.resend(ses); - - if (sndRes) - nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recovery.receivedCount())); - - recovery.connected(); - - GridTcpNioCommunicationClient client = new GridTcpNioCommunicationClient(ses, log); - - GridCommunicationClient oldClient = clients.putIfAbsent(node.id(), client); - - assert oldClient == null : "Client already created [node=" + node + ", client=" + client + - ", oldClient=" + oldClient + ", recoveryDesc=" + recovery + ']'; - - return client; - } - - /** - * - */ - @SuppressWarnings("PackageVisibleInnerClass") - class ConnectClosure implements IgniteInClosure<Boolean> { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private final GridNioSession ses; - - /** */ - private final GridNioRecoveryDescriptor recoveryDesc; - - /** */ - private final ClusterNode rmtNode; - - /** */ - private final HandshakeMessage msg; - - /** */ - private final GridFutureAdapter<GridCommunicationClient> fut; - - /** - * @param ses Incoming session. - * @param recoveryDesc Recovery descriptor. - * @param rmtNode Remote node. - * @param msg Handshake message. - * @param fut Connect future. - */ - ConnectClosure(GridNioSession ses, - GridNioRecoveryDescriptor recoveryDesc, - ClusterNode rmtNode, - HandshakeMessage msg, - GridFutureAdapter<GridCommunicationClient> fut) { - this.ses = ses; - this.recoveryDesc = recoveryDesc; - this.rmtNode = rmtNode; - this.msg = msg; - this.fut = fut; - } - - /** {@inheritDoc} */ - @Override public void apply(Boolean success) { - if (success) { - IgniteInClosure<IgniteInternalFuture<?>> lsnr = new IgniteInClosure<IgniteInternalFuture<?>>() { - @Override public void apply(IgniteInternalFuture<?> msgFut) { - try { - msgFut.get(); - - GridTcpNioCommunicationClient client = - connected(recoveryDesc, ses, rmtNode, msg.received(), false); - - fut.onDone(client); - } - catch (IgniteCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Failed to send recovery handshake " + - "[rmtNode=" + rmtNode.id() + ", err=" + e + ']'); - - recoveryDesc.release(); - - fut.onDone(); - } - finally { - clientFuts.remove(rmtNode.id(), fut); - } - } - }; - - nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recoveryDesc.receivedCount()), lsnr); - } - else { - try { - fut.onDone(); - } - finally { - clientFuts.remove(rmtNode.id(), fut); - } - } - } - } - }; + private final GridNioServerListener<Message> srvLsnr = new NioServerListener(); /** Logger. */ @LoggerResource @@ -2893,4 +2539,366 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter return S.toString(NodeIdMessage.class, this); } } + + /** + * + */ + private class NioServerListener extends GridNioServerListenerAdapter<Message> { + /** {@inheritDoc} */ + @Override public void onSessionWriteTimeout(GridNioSession ses) { + LT.warn(log, null, "Communication SPI Session write timed out (consider increasing " + + "'socketWriteTimeout' " + "configuration property) [remoteAddr=" + ses.remoteAddress() + + ", writeTimeout=" + sockWriteTimeout + ']'); + + if (log.isDebugEnabled()) + log.debug("Closing communication SPI session on write timeout [remoteAddr=" + ses.remoteAddress() + + ", writeTimeout=" + sockWriteTimeout + ']'); + + ses.close(); + } + + /** {@inheritDoc} */ + @Override public void onConnected(GridNioSession ses) { + if (ses.accepted()) { + if (log.isDebugEnabled()) + log.debug("Sending local node ID to newly accepted session: " + ses); + + ses.send(nodeIdMsg); + } + } + + /** {@inheritDoc} */ + @Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) { + UUID id = ses.meta(NODE_ID_META); + + if (id != null) { + GridCommunicationClient rmv = clients.get(id); + + if (rmv instanceof GridTcpNioCommunicationClient && + ((GridTcpNioCommunicationClient)rmv).session() == ses && + clients.remove(id, rmv)) { + rmv.forceClose(); + + if (!isNodeStopping()) { + GridNioRecoveryDescriptor recoveryData = ses.recoveryDescriptor(); + + if (!getSpiContext().tryFailNode(id)) { + if (recoveryData != null) { + if (recoveryData.nodeAlive(getSpiContext().node(id))) { + if (!recoveryData.messagesFutures().isEmpty()) { + if (log.isDebugEnabled()) + log.debug("Session was closed but there are unacknowledged messages, " + + "will try to reconnect [rmtNode=" + recoveryData.node().id() + ']'); + + recoveryWorker.addReconnectRequest(recoveryData); + } + } + else + recoveryData.onNodeLeft(); + } + } + else + recoveryData.onNodeLeft(); + } + } + + CommunicationListener<Message> lsnr0 = lsnr; + + if (lsnr0 != null) + lsnr0.onDisconnected(id); + } + } + + /** {@inheritDoc} */ + @Override public void onMessage(GridNioSession ses, Message msg) { + UUID sndId = ses.meta(NODE_ID_META); + + if (sndId == null) { + assert ses.accepted(); + + if (msg instanceof NodeIdMessage) + sndId = U.bytesToUuid(((NodeIdMessage) msg).nodeIdBytes, 0); + else { + assert msg instanceof HandshakeMessage : msg; + + sndId = ((HandshakeMessage)msg).nodeId(); + } + + if (log.isDebugEnabled()) + log.debug("Remote node ID received: " + sndId); + + final UUID old = ses.addMeta(NODE_ID_META, sndId); + + assert old == null; + + final ClusterNode rmtNode = getSpiContext().node(sndId); + + if (rmtNode == null) { + ses.close(); + + return; + } + + ClusterNode locNode = getSpiContext().localNode(); + + if (ses.remoteAddress() == null) + return; + + GridCommunicationClient oldClient = clients.get(sndId); + + if (oldClient != null) { + if (oldClient instanceof GridTcpNioCommunicationClient) { + if (log.isDebugEnabled()) + log.debug("Received incoming connection when already connected " + + "to this node, rejecting [locNode=" + locNode.id() + + ", rmtNode=" + sndId + ']'); + + ses.send(new RecoveryLastReceivedMessage(-1)); + + return; + } + } + + GridFutureAdapter<GridCommunicationClient> fut = new GridFutureAdapter<>(); + + GridFutureAdapter<GridCommunicationClient> oldFut = clientFuts.putIfAbsent(sndId, fut); + + assert msg instanceof HandshakeMessage : msg; + + HandshakeMessage msg0 = (HandshakeMessage)msg; + + final GridNioRecoveryDescriptor recoveryDesc = recoveryDescriptor(rmtNode); + + if (oldFut == null) { + oldClient = clients.get(sndId); + + if (oldClient != null) { + if (oldClient instanceof GridTcpNioCommunicationClient) { + if (log.isDebugEnabled()) + log.debug("Received incoming connection when already connected " + + "to this node, rejecting [locNode=" + locNode.id() + + ", rmtNode=" + sndId + ']'); + + ses.send(new RecoveryLastReceivedMessage(-1)); + + return; + } + } + + boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(), + new ConnectClosure(ses, recoveryDesc, rmtNode, msg0, fut)); + + if (log.isDebugEnabled()) + log.debug("Received incoming connection from remote node " + + "[rmtNode=" + rmtNode.id() + ", reserved=" + reserved + ']'); + + if (reserved) { + try { + GridTcpNioCommunicationClient client = + connected(recoveryDesc, ses, rmtNode, msg0.received(), true); + + fut.onDone(client); + } + finally { + clientFuts.remove(rmtNode.id(), fut); + } + } + } + else { + if (oldFut instanceof ConnectFuture && locNode.order() < rmtNode.order()) { + if (log.isDebugEnabled()) { + log.debug("Received incoming connection from remote node while " + + "connecting to this node, rejecting [locNode=" + locNode.id() + + ", locNodeOrder=" + locNode.order() + ", rmtNode=" + rmtNode.id() + + ", rmtNodeOrder=" + rmtNode.order() + ']'); + } + + ses.send(new RecoveryLastReceivedMessage(-1)); + } + else { + boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(), + new ConnectClosure(ses, recoveryDesc, rmtNode, msg0, fut)); + + if (reserved) { + GridTcpNioCommunicationClient client = + connected(recoveryDesc, ses, rmtNode, msg0.received(), true); + + fut.onDone(client); + } + } + } + } + else { + rcvdMsgsCnt.increment(); + + GridNioRecoveryDescriptor recovery = ses.recoveryDescriptor(); + + if (recovery != null) { + if (msg instanceof RecoveryLastReceivedMessage) { + RecoveryLastReceivedMessage msg0 = (RecoveryLastReceivedMessage)msg; + + if (log.isDebugEnabled()) + log.debug("Received recovery acknowledgement [rmtNode=" + sndId + + ", rcvCnt=" + msg0.received() + ']'); + + recovery.ackReceived(msg0.received()); + + return; + } + else { + long rcvCnt = recovery.onReceived(); + + if (rcvCnt % ackSndThreshold == 0) { + if (log.isDebugEnabled()) + log.debug("Send recovery acknowledgement [rmtNode=" + sndId + + ", rcvCnt=" + rcvCnt + ']'); + + nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(rcvCnt)); + + recovery.lastAcknowledged(rcvCnt); + } + } + } + + IgniteRunnable c; + + if (msgQueueLimit > 0) { + GridNioMessageTracker tracker = ses.meta(TRACKER_META); + + if (tracker == null) { + GridNioMessageTracker old = ses.addMeta(TRACKER_META, tracker = + new GridNioMessageTracker(ses, msgQueueLimit)); + + assert old == null; + } + + tracker.onMessageReceived(); + + c = tracker; + } + else + c = NOOP; + + notifyListener(sndId, msg, c); + } + } + + /** + * @param recovery Recovery descriptor. + * @param ses Session. + * @param node Node. + * @param rcvCnt Number of received messages.. + * @param sndRes If {@code true} sends response for recovery handshake. + * @return Client. + */ + private GridTcpNioCommunicationClient connected( + GridNioRecoveryDescriptor recovery, + GridNioSession ses, + ClusterNode node, + long rcvCnt, + boolean sndRes) { + recovery.onHandshake(rcvCnt); + + ses.recoveryDescriptor(recovery); + + nioSrvr.resend(ses); + + if (sndRes) + nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recovery.receivedCount())); + + recovery.connected(); + + GridTcpNioCommunicationClient client = new GridTcpNioCommunicationClient(ses, log); + + GridCommunicationClient oldClient = clients.putIfAbsent(node.id(), client); + + assert oldClient == null : "Client already created [node=" + node + ", client=" + client + + ", oldClient=" + oldClient + ", recoveryDesc=" + recovery + ']'; + + return client; + } + + /** + * + */ + @SuppressWarnings("PackageVisibleInnerClass") + class ConnectClosure implements IgniteInClosure<Boolean> { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final GridNioSession ses; + + /** */ + private final GridNioRecoveryDescriptor recoveryDesc; + + /** */ + private final ClusterNode rmtNode; + + /** */ + private final HandshakeMessage msg; + + /** */ + private final GridFutureAdapter<GridCommunicationClient> fut; + + /** + * @param ses Incoming session. + * @param recoveryDesc Recovery descriptor. + * @param rmtNode Remote node. + * @param msg Handshake message. + * @param fut Connect future. + */ + ConnectClosure(GridNioSession ses, + GridNioRecoveryDescriptor recoveryDesc, + ClusterNode rmtNode, + HandshakeMessage msg, + GridFutureAdapter<GridCommunicationClient> fut) { + this.ses = ses; + this.recoveryDesc = recoveryDesc; + this.rmtNode = rmtNode; + this.msg = msg; + this.fut = fut; + } + + /** {@inheritDoc} */ + @Override public void apply(Boolean success) { + if (success) { + IgniteInClosure<IgniteInternalFuture<?>> lsnr = new IgniteInClosure<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> msgFut) { + try { + msgFut.get(); + + GridTcpNioCommunicationClient client = + connected(recoveryDesc, ses, rmtNode, msg.received(), false); + + fut.onDone(client); + } + catch (IgniteCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to send recovery handshake " + + "[rmtNode=" + rmtNode.id() + ", err=" + e + ']'); + + recoveryDesc.release(); + + fut.onDone(); + } + finally { + clientFuts.remove(rmtNode.id(), fut); + } + } + }; + + nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recoveryDesc.receivedCount()), lsnr); + } + else { + try { + fut.onDone(); + } + finally { + clientFuts.remove(rmtNode.id(), fut); + } + } + } + } + } }