# IGNITE-499 Refactoring: convert anoni,ouse to inner.

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/22dcbb34
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/22dcbb34
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/22dcbb34

Branch: refs/heads/ignite-499_1
Commit: 22dcbb3490d891cccc447e5520d4d7e17d269968
Parents: 3a2d5c2
Author: sevdokimov <sevdoki...@gridgain.com>
Authored: Fri Apr 17 14:41:05 2015 +0300
Committer: sevdokimov <sevdoki...@gridgain.com>
Committed: Fri Apr 17 14:41:05 2015 +0300

----------------------------------------------------------------------
 .../communication/tcp/TcpCommunicationSpi.java  | 718 ++++++++++---------
 1 file changed, 363 insertions(+), 355 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/22dcbb34/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 8312638..98db27d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -221,361 +221,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     public static final byte HANDSHAKE_MSG_TYPE = -3;
 
     /** Server listener. */
-    private final GridNioServerListener<Message> srvLsnr =
-        new GridNioServerListenerAdapter<Message>() {
-            @Override public void onSessionWriteTimeout(GridNioSession ses) {
-                LT.warn(log, null, "Communication SPI Session write timed out 
(consider increasing " +
-                    "'socketWriteTimeout' " + "configuration property) 
[remoteAddr=" + ses.remoteAddress() +
-                    ", writeTimeout=" + sockWriteTimeout + ']');
-
-                if (log.isDebugEnabled())
-                    log.debug("Closing communication SPI session on write 
timeout [remoteAddr=" + ses.remoteAddress() +
-                        ", writeTimeout=" + sockWriteTimeout + ']');
-
-                ses.close();
-            }
-
-            @Override public void onConnected(GridNioSession ses) {
-                if (ses.accepted()) {
-                    if (log.isDebugEnabled())
-                        log.debug("Sending local node ID to newly accepted 
session: " + ses);
-
-                    ses.send(nodeIdMsg);
-                }
-            }
-
-            @Override public void onDisconnected(GridNioSession ses, @Nullable 
Exception e) {
-                UUID id = ses.meta(NODE_ID_META);
-
-                if (id != null) {
-                    GridCommunicationClient rmv = clients.get(id);
-
-                    if (rmv instanceof GridTcpNioCommunicationClient &&
-                        ((GridTcpNioCommunicationClient)rmv).session() == ses 
&&
-                        clients.remove(id, rmv)) {
-                        rmv.forceClose();
-
-                        if (!isNodeStopping()) {
-                            GridNioRecoveryDescriptor recoveryData = 
ses.recoveryDescriptor();
-
-                            if (!getSpiContext().tryFailNode(id)) {
-                                if (recoveryData != null) {
-                                    if 
(recoveryData.nodeAlive(getSpiContext().node(id))) {
-                                        if 
(!recoveryData.messagesFutures().isEmpty()) {
-                                            if (log.isDebugEnabled())
-                                                log.debug("Session was closed 
but there are unacknowledged messages, " +
-                                                    "will try to reconnect 
[rmtNode=" + recoveryData.node().id() + ']');
-
-                                            
recoveryWorker.addReconnectRequest(recoveryData);
-                                        }
-                                    }
-                                    else
-                                        recoveryData.onNodeLeft();
-                                }
-                            }
-                            else
-                                recoveryData.onNodeLeft();
-                        }
-                    }
-
-                    CommunicationListener<Message> lsnr0 = lsnr;
-
-                    if (lsnr0 != null)
-                        lsnr0.onDisconnected(id);
-                }
-            }
-
-            @Override public void onMessage(GridNioSession ses, Message msg) {
-                UUID sndId = ses.meta(NODE_ID_META);
-
-                if (sndId == null) {
-                    assert ses.accepted();
-
-                    if (msg instanceof NodeIdMessage)
-                        sndId = U.bytesToUuid(((NodeIdMessage) 
msg).nodeIdBytes, 0);
-                    else {
-                        assert msg instanceof HandshakeMessage : msg;
-
-                        sndId = ((HandshakeMessage)msg).nodeId();
-                    }
-
-                    if (log.isDebugEnabled())
-                        log.debug("Remote node ID received: " + sndId);
-
-                    final UUID old = ses.addMeta(NODE_ID_META, sndId);
-
-                    assert old == null;
-
-                    final ClusterNode rmtNode = getSpiContext().node(sndId);
-
-                    if (rmtNode == null) {
-                        ses.close();
-
-                        return;
-                    }
-
-                    ClusterNode locNode = getSpiContext().localNode();
-
-                    if (ses.remoteAddress() == null)
-                        return;
-
-                    GridCommunicationClient oldClient = clients.get(sndId);
-
-                    if (oldClient != null) {
-                        if (oldClient instanceof 
GridTcpNioCommunicationClient) {
-                            if (log.isDebugEnabled())
-                                log.debug("Received incoming connection when 
already connected " +
-                                    "to this node, rejecting [locNode=" + 
locNode.id() +
-                                    ", rmtNode=" + sndId + ']');
-
-                            ses.send(new RecoveryLastReceivedMessage(-1));
-
-                            return;
-                        }
-                    }
-
-                    GridFutureAdapter<GridCommunicationClient> fut = new 
GridFutureAdapter<>();
-
-                    GridFutureAdapter<GridCommunicationClient> oldFut = 
clientFuts.putIfAbsent(sndId, fut);
-
-                    assert msg instanceof HandshakeMessage : msg;
-
-                    HandshakeMessage msg0 = (HandshakeMessage)msg;
-
-                    final GridNioRecoveryDescriptor recoveryDesc = 
recoveryDescriptor(rmtNode);
-
-                    if (oldFut == null) {
-                        oldClient = clients.get(sndId);
-
-                        if (oldClient != null) {
-                            if (oldClient instanceof 
GridTcpNioCommunicationClient) {
-                                if (log.isDebugEnabled())
-                                    log.debug("Received incoming connection 
when already connected " +
-                                        "to this node, rejecting [locNode=" + 
locNode.id() +
-                                        ", rmtNode=" + sndId + ']');
-
-                                ses.send(new RecoveryLastReceivedMessage(-1));
-
-                                return;
-                            }
-                        }
-
-                        boolean reserved = 
recoveryDesc.tryReserve(msg0.connectCount(),
-                            new ConnectClosure(ses, recoveryDesc, rmtNode, 
msg0, fut));
-
-                        if (log.isDebugEnabled())
-                            log.debug("Received incoming connection from 
remote node " +
-                                "[rmtNode=" + rmtNode.id() + ", reserved=" + 
reserved + ']');
-
-                        if (reserved) {
-                            try {
-                                GridTcpNioCommunicationClient client =
-                                    connected(recoveryDesc, ses, rmtNode, 
msg0.received(), true);
-
-                                fut.onDone(client);
-                            }
-                            finally {
-                                clientFuts.remove(rmtNode.id(), fut);
-                            }
-                        }
-                    }
-                    else {
-                        if (oldFut instanceof ConnectFuture && locNode.order() 
< rmtNode.order()) {
-                            if (log.isDebugEnabled()) {
-                                log.debug("Received incoming connection from 
remote node while " +
-                                    "connecting to this node, rejecting 
[locNode=" + locNode.id() +
-                                    ", locNodeOrder=" + locNode.order() + ", 
rmtNode=" + rmtNode.id() +
-                                    ", rmtNodeOrder=" + rmtNode.order() + ']');
-                            }
-
-                            ses.send(new RecoveryLastReceivedMessage(-1));
-                        }
-                        else {
-                            boolean reserved = 
recoveryDesc.tryReserve(msg0.connectCount(),
-                                new ConnectClosure(ses, recoveryDesc, rmtNode, 
msg0, fut));
-
-                            if (reserved) {
-                                GridTcpNioCommunicationClient client =
-                                    connected(recoveryDesc, ses, rmtNode, 
msg0.received(), true);
-
-                                fut.onDone(client);
-                            }
-                        }
-                    }
-                }
-                else {
-                    rcvdMsgsCnt.increment();
-
-                    GridNioRecoveryDescriptor recovery = 
ses.recoveryDescriptor();
-
-                    if (recovery != null) {
-                        if (msg instanceof RecoveryLastReceivedMessage) {
-                            RecoveryLastReceivedMessage msg0 = 
(RecoveryLastReceivedMessage)msg;
-
-                            if (log.isDebugEnabled())
-                                log.debug("Received recovery acknowledgement 
[rmtNode=" + sndId +
-                                    ", rcvCnt=" + msg0.received() + ']');
-
-                            recovery.ackReceived(msg0.received());
-
-                            return;
-                        }
-                        else {
-                            long rcvCnt = recovery.onReceived();
-
-                            if (rcvCnt % ackSndThreshold == 0) {
-                                if (log.isDebugEnabled())
-                                    log.debug("Send recovery acknowledgement 
[rmtNode=" + sndId +
-                                        ", rcvCnt=" + rcvCnt + ']');
-
-                                nioSrvr.sendSystem(ses, new 
RecoveryLastReceivedMessage(rcvCnt));
-
-                                recovery.lastAcknowledged(rcvCnt);
-                            }
-                        }
-                    }
-
-                    IgniteRunnable c;
-
-                    if (msgQueueLimit > 0) {
-                        GridNioMessageTracker tracker = ses.meta(TRACKER_META);
-
-                        if (tracker == null) {
-                            GridNioMessageTracker old = 
ses.addMeta(TRACKER_META, tracker =
-                                new GridNioMessageTracker(ses, msgQueueLimit));
-
-                            assert old == null;
-                        }
-
-                        tracker.onMessageReceived();
-
-                        c = tracker;
-                    }
-                    else
-                        c = NOOP;
-
-                    notifyListener(sndId, msg, c);
-                }
-            }
-
-            /**
-             * @param recovery Recovery descriptor.
-             * @param ses Session.
-             * @param node Node.
-             * @param rcvCnt Number of received messages..
-             * @param sndRes If {@code true} sends response for recovery 
handshake.
-             * @return Client.
-             */
-            private GridTcpNioCommunicationClient connected(
-                GridNioRecoveryDescriptor recovery,
-                GridNioSession ses,
-                ClusterNode node,
-                long rcvCnt,
-                boolean sndRes) {
-                recovery.onHandshake(rcvCnt);
-
-                ses.recoveryDescriptor(recovery);
-
-                nioSrvr.resend(ses);
-
-                if (sndRes)
-                    nioSrvr.sendSystem(ses, new 
RecoveryLastReceivedMessage(recovery.receivedCount()));
-
-                recovery.connected();
-
-                GridTcpNioCommunicationClient client = new 
GridTcpNioCommunicationClient(ses, log);
-
-                GridCommunicationClient oldClient = 
clients.putIfAbsent(node.id(), client);
-
-                assert oldClient == null : "Client already created [node=" + 
node + ", client=" + client +
-                        ", oldClient=" + oldClient + ", recoveryDesc=" + 
recovery + ']';
-
-                return client;
-            }
-
-            /**
-             *
-             */
-            @SuppressWarnings("PackageVisibleInnerClass")
-            class ConnectClosure implements IgniteInClosure<Boolean> {
-                /** */
-                private static final long serialVersionUID = 0L;
-
-                /** */
-                private final GridNioSession ses;
-
-                /** */
-                private final GridNioRecoveryDescriptor recoveryDesc;
-
-                /** */
-                private final ClusterNode rmtNode;
-
-                /** */
-                private final HandshakeMessage msg;
-
-                /** */
-                private final GridFutureAdapter<GridCommunicationClient> fut;
-
-                /**
-                 * @param ses Incoming session.
-                 * @param recoveryDesc Recovery descriptor.
-                 * @param rmtNode Remote node.
-                 * @param msg Handshake message.
-                 * @param fut Connect future.
-                 */
-                ConnectClosure(GridNioSession ses,
-                    GridNioRecoveryDescriptor recoveryDesc,
-                    ClusterNode rmtNode,
-                    HandshakeMessage msg,
-                    GridFutureAdapter<GridCommunicationClient> fut) {
-                    this.ses = ses;
-                    this.recoveryDesc = recoveryDesc;
-                    this.rmtNode = rmtNode;
-                    this.msg = msg;
-                    this.fut = fut;
-                }
-
-                /** {@inheritDoc} */
-                @Override public void apply(Boolean success) {
-                    if (success) {
-                        IgniteInClosure<IgniteInternalFuture<?>> lsnr = new 
IgniteInClosure<IgniteInternalFuture<?>>() {
-                            @Override public void 
apply(IgniteInternalFuture<?> msgFut) {
-                                try {
-                                    msgFut.get();
-
-                                    GridTcpNioCommunicationClient client =
-                                        connected(recoveryDesc, ses, rmtNode, 
msg.received(), false);
-
-                                    fut.onDone(client);
-                                }
-                                catch (IgniteCheckedException e) {
-                                    if (log.isDebugEnabled())
-                                        log.debug("Failed to send recovery 
handshake " +
-                                            "[rmtNode=" + rmtNode.id() + ", 
err=" + e + ']');
-
-                                    recoveryDesc.release();
-
-                                    fut.onDone();
-                                }
-                                finally {
-                                    clientFuts.remove(rmtNode.id(), fut);
-                                }
-                            }
-                        };
-
-                        nioSrvr.sendSystem(ses, new 
RecoveryLastReceivedMessage(recoveryDesc.receivedCount()), lsnr);
-                    }
-                    else {
-                        try {
-                            fut.onDone();
-                        }
-                        finally {
-                            clientFuts.remove(rmtNode.id(), fut);
-                        }
-                    }
-                }
-            }
-        };
+    private final GridNioServerListener<Message> srvLsnr = new 
NioServerListener();
 
     /** Logger. */
     @LoggerResource
@@ -2893,4 +2539,366 @@ public class TcpCommunicationSpi extends 
IgniteSpiAdapter
             return S.toString(NodeIdMessage.class, this);
         }
     }
+
+    /**
+     *
+     */
+    private class NioServerListener extends 
GridNioServerListenerAdapter<Message> {
+        /** {@inheritDoc} */
+        @Override public void onSessionWriteTimeout(GridNioSession ses) {
+            LT.warn(log, null, "Communication SPI Session write timed out 
(consider increasing " +
+                "'socketWriteTimeout' " + "configuration property) 
[remoteAddr=" + ses.remoteAddress() +
+                ", writeTimeout=" + sockWriteTimeout + ']');
+
+            if (log.isDebugEnabled())
+                log.debug("Closing communication SPI session on write timeout 
[remoteAddr=" + ses.remoteAddress() +
+                    ", writeTimeout=" + sockWriteTimeout + ']');
+
+            ses.close();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onConnected(GridNioSession ses) {
+            if (ses.accepted()) {
+                if (log.isDebugEnabled())
+                    log.debug("Sending local node ID to newly accepted 
session: " + ses);
+
+                ses.send(nodeIdMsg);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onDisconnected(GridNioSession ses, @Nullable 
Exception e) {
+            UUID id = ses.meta(NODE_ID_META);
+
+            if (id != null) {
+                GridCommunicationClient rmv = clients.get(id);
+
+                if (rmv instanceof GridTcpNioCommunicationClient &&
+                    ((GridTcpNioCommunicationClient)rmv).session() == ses &&
+                    clients.remove(id, rmv)) {
+                    rmv.forceClose();
+
+                    if (!isNodeStopping()) {
+                        GridNioRecoveryDescriptor recoveryData = 
ses.recoveryDescriptor();
+
+                        if (!getSpiContext().tryFailNode(id)) {
+                            if (recoveryData != null) {
+                                if 
(recoveryData.nodeAlive(getSpiContext().node(id))) {
+                                    if 
(!recoveryData.messagesFutures().isEmpty()) {
+                                        if (log.isDebugEnabled())
+                                            log.debug("Session was closed but 
there are unacknowledged messages, " +
+                                                "will try to reconnect 
[rmtNode=" + recoveryData.node().id() + ']');
+
+                                        
recoveryWorker.addReconnectRequest(recoveryData);
+                                    }
+                                }
+                                else
+                                    recoveryData.onNodeLeft();
+                            }
+                        }
+                        else
+                            recoveryData.onNodeLeft();
+                    }
+                }
+
+                CommunicationListener<Message> lsnr0 = lsnr;
+
+                if (lsnr0 != null)
+                    lsnr0.onDisconnected(id);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onMessage(GridNioSession ses, Message msg) {
+            UUID sndId = ses.meta(NODE_ID_META);
+
+            if (sndId == null) {
+                assert ses.accepted();
+
+                if (msg instanceof NodeIdMessage)
+                    sndId = U.bytesToUuid(((NodeIdMessage) msg).nodeIdBytes, 
0);
+                else {
+                    assert msg instanceof HandshakeMessage : msg;
+
+                    sndId = ((HandshakeMessage)msg).nodeId();
+                }
+
+                if (log.isDebugEnabled())
+                    log.debug("Remote node ID received: " + sndId);
+
+                final UUID old = ses.addMeta(NODE_ID_META, sndId);
+
+                assert old == null;
+
+                final ClusterNode rmtNode = getSpiContext().node(sndId);
+
+                if (rmtNode == null) {
+                    ses.close();
+
+                    return;
+                }
+
+                ClusterNode locNode = getSpiContext().localNode();
+
+                if (ses.remoteAddress() == null)
+                    return;
+
+                GridCommunicationClient oldClient = clients.get(sndId);
+
+                if (oldClient != null) {
+                    if (oldClient instanceof GridTcpNioCommunicationClient) {
+                        if (log.isDebugEnabled())
+                            log.debug("Received incoming connection when 
already connected " +
+                                "to this node, rejecting [locNode=" + 
locNode.id() +
+                                ", rmtNode=" + sndId + ']');
+
+                        ses.send(new RecoveryLastReceivedMessage(-1));
+
+                        return;
+                    }
+                }
+
+                GridFutureAdapter<GridCommunicationClient> fut = new 
GridFutureAdapter<>();
+
+                GridFutureAdapter<GridCommunicationClient> oldFut = 
clientFuts.putIfAbsent(sndId, fut);
+
+                assert msg instanceof HandshakeMessage : msg;
+
+                HandshakeMessage msg0 = (HandshakeMessage)msg;
+
+                final GridNioRecoveryDescriptor recoveryDesc = 
recoveryDescriptor(rmtNode);
+
+                if (oldFut == null) {
+                    oldClient = clients.get(sndId);
+
+                    if (oldClient != null) {
+                        if (oldClient instanceof 
GridTcpNioCommunicationClient) {
+                            if (log.isDebugEnabled())
+                                log.debug("Received incoming connection when 
already connected " +
+                                    "to this node, rejecting [locNode=" + 
locNode.id() +
+                                    ", rmtNode=" + sndId + ']');
+
+                            ses.send(new RecoveryLastReceivedMessage(-1));
+
+                            return;
+                        }
+                    }
+
+                    boolean reserved = 
recoveryDesc.tryReserve(msg0.connectCount(),
+                        new ConnectClosure(ses, recoveryDesc, rmtNode, msg0, 
fut));
+
+                    if (log.isDebugEnabled())
+                        log.debug("Received incoming connection from remote 
node " +
+                            "[rmtNode=" + rmtNode.id() + ", reserved=" + 
reserved + ']');
+
+                    if (reserved) {
+                        try {
+                            GridTcpNioCommunicationClient client =
+                                connected(recoveryDesc, ses, rmtNode, 
msg0.received(), true);
+
+                            fut.onDone(client);
+                        }
+                        finally {
+                            clientFuts.remove(rmtNode.id(), fut);
+                        }
+                    }
+                }
+                else {
+                    if (oldFut instanceof ConnectFuture && locNode.order() < 
rmtNode.order()) {
+                        if (log.isDebugEnabled()) {
+                            log.debug("Received incoming connection from 
remote node while " +
+                                "connecting to this node, rejecting [locNode=" 
+ locNode.id() +
+                                ", locNodeOrder=" + locNode.order() + ", 
rmtNode=" + rmtNode.id() +
+                                ", rmtNodeOrder=" + rmtNode.order() + ']');
+                        }
+
+                        ses.send(new RecoveryLastReceivedMessage(-1));
+                    }
+                    else {
+                        boolean reserved = 
recoveryDesc.tryReserve(msg0.connectCount(),
+                            new ConnectClosure(ses, recoveryDesc, rmtNode, 
msg0, fut));
+
+                        if (reserved) {
+                            GridTcpNioCommunicationClient client =
+                                connected(recoveryDesc, ses, rmtNode, 
msg0.received(), true);
+
+                            fut.onDone(client);
+                        }
+                    }
+                }
+            }
+            else {
+                rcvdMsgsCnt.increment();
+
+                GridNioRecoveryDescriptor recovery = ses.recoveryDescriptor();
+
+                if (recovery != null) {
+                    if (msg instanceof RecoveryLastReceivedMessage) {
+                        RecoveryLastReceivedMessage msg0 = 
(RecoveryLastReceivedMessage)msg;
+
+                        if (log.isDebugEnabled())
+                            log.debug("Received recovery acknowledgement 
[rmtNode=" + sndId +
+                                ", rcvCnt=" + msg0.received() + ']');
+
+                        recovery.ackReceived(msg0.received());
+
+                        return;
+                    }
+                    else {
+                        long rcvCnt = recovery.onReceived();
+
+                        if (rcvCnt % ackSndThreshold == 0) {
+                            if (log.isDebugEnabled())
+                                log.debug("Send recovery acknowledgement 
[rmtNode=" + sndId +
+                                    ", rcvCnt=" + rcvCnt + ']');
+
+                            nioSrvr.sendSystem(ses, new 
RecoveryLastReceivedMessage(rcvCnt));
+
+                            recovery.lastAcknowledged(rcvCnt);
+                        }
+                    }
+                }
+
+                IgniteRunnable c;
+
+                if (msgQueueLimit > 0) {
+                    GridNioMessageTracker tracker = ses.meta(TRACKER_META);
+
+                    if (tracker == null) {
+                        GridNioMessageTracker old = ses.addMeta(TRACKER_META, 
tracker =
+                            new GridNioMessageTracker(ses, msgQueueLimit));
+
+                        assert old == null;
+                    }
+
+                    tracker.onMessageReceived();
+
+                    c = tracker;
+                }
+                else
+                    c = NOOP;
+
+                notifyListener(sndId, msg, c);
+            }
+        }
+
+        /**
+         * @param recovery Recovery descriptor.
+         * @param ses Session.
+         * @param node Node.
+         * @param rcvCnt Number of received messages..
+         * @param sndRes If {@code true} sends response for recovery handshake.
+         * @return Client.
+         */
+        private GridTcpNioCommunicationClient connected(
+            GridNioRecoveryDescriptor recovery,
+            GridNioSession ses,
+            ClusterNode node,
+            long rcvCnt,
+            boolean sndRes) {
+            recovery.onHandshake(rcvCnt);
+
+            ses.recoveryDescriptor(recovery);
+
+            nioSrvr.resend(ses);
+
+            if (sndRes)
+                nioSrvr.sendSystem(ses, new 
RecoveryLastReceivedMessage(recovery.receivedCount()));
+
+            recovery.connected();
+
+            GridTcpNioCommunicationClient client = new 
GridTcpNioCommunicationClient(ses, log);
+
+            GridCommunicationClient oldClient = clients.putIfAbsent(node.id(), 
client);
+
+            assert oldClient == null : "Client already created [node=" + node 
+ ", client=" + client +
+                    ", oldClient=" + oldClient + ", recoveryDesc=" + recovery 
+ ']';
+
+            return client;
+        }
+
+        /**
+         *
+         */
+        @SuppressWarnings("PackageVisibleInnerClass")
+        class ConnectClosure implements IgniteInClosure<Boolean> {
+            /** */
+            private static final long serialVersionUID = 0L;
+
+            /** */
+            private final GridNioSession ses;
+
+            /** */
+            private final GridNioRecoveryDescriptor recoveryDesc;
+
+            /** */
+            private final ClusterNode rmtNode;
+
+            /** */
+            private final HandshakeMessage msg;
+
+            /** */
+            private final GridFutureAdapter<GridCommunicationClient> fut;
+
+            /**
+             * @param ses Incoming session.
+             * @param recoveryDesc Recovery descriptor.
+             * @param rmtNode Remote node.
+             * @param msg Handshake message.
+             * @param fut Connect future.
+             */
+            ConnectClosure(GridNioSession ses,
+                GridNioRecoveryDescriptor recoveryDesc,
+                ClusterNode rmtNode,
+                HandshakeMessage msg,
+                GridFutureAdapter<GridCommunicationClient> fut) {
+                this.ses = ses;
+                this.recoveryDesc = recoveryDesc;
+                this.rmtNode = rmtNode;
+                this.msg = msg;
+                this.fut = fut;
+            }
+
+            /** {@inheritDoc} */
+            @Override public void apply(Boolean success) {
+                if (success) {
+                    IgniteInClosure<IgniteInternalFuture<?>> lsnr = new 
IgniteInClosure<IgniteInternalFuture<?>>() {
+                        @Override public void apply(IgniteInternalFuture<?> 
msgFut) {
+                            try {
+                                msgFut.get();
+
+                                GridTcpNioCommunicationClient client =
+                                    connected(recoveryDesc, ses, rmtNode, 
msg.received(), false);
+
+                                fut.onDone(client);
+                            }
+                            catch (IgniteCheckedException e) {
+                                if (log.isDebugEnabled())
+                                    log.debug("Failed to send recovery 
handshake " +
+                                        "[rmtNode=" + rmtNode.id() + ", err=" 
+ e + ']');
+
+                                recoveryDesc.release();
+
+                                fut.onDone();
+                            }
+                            finally {
+                                clientFuts.remove(rmtNode.id(), fut);
+                            }
+                        }
+                    };
+
+                    nioSrvr.sendSystem(ses, new 
RecoveryLastReceivedMessage(recoveryDesc.receivedCount()), lsnr);
+                }
+                else {
+                    try {
+                        fut.onDone();
+                    }
+                    finally {
+                        clientFuts.remove(rmtNode.id(), fut);
+                    }
+                }
+            }
+        }
+    }
 }

Reply via email to