Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-51 d7db0122b -> bdb0f5572


IGNITE-11 (Discovery takes a lot of time on Windows if DescoverySpi contains 
several hosts and several ports per host)


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a9ecd99f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a9ecd99f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a9ecd99f

Branch: refs/heads/ignite-51
Commit: a9ecd99f1b0713890c16e42cd53c264167959e0f
Parents: c93d86f
Author: sevdokimov <sevdoki...@gridgain.com>
Authored: Thu Feb 19 16:30:41 2015 +0300
Committer: sevdokimov <sevdoki...@gridgain.com>
Committed: Thu Feb 19 16:30:41 2015 +0300

----------------------------------------------------------------------
 .../spi/discovery/tcp/TcpDiscoverySpi.java      | 138 +++++++++++--------
 .../discovery/tcp/TcpDiscoverySpiAdapter.java   | 110 +++++++++++++++
 2 files changed, 193 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a9ecd99f/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index aef8259..3e9c120 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -1395,65 +1395,83 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
                 return false;
 
             boolean retry = false;
-            IgniteCheckedException errs = null;
+            Collection<Exception> errs = new ArrayList<>();
 
-            for (InetSocketAddress addr : addrs) {
-                try {
-                    Integer res = sendMessageDirectly(joinReq, addr);
+            SocketMultiConnector multiConnector = new 
SocketMultiConnector(addrs, 2);
 
-                    assert res != null;
+            try {
+                GridTuple3<InetSocketAddress, Socket, Exception> tuple;
 
-                    noResAddrs.remove(addr);
+                while ((tuple = multiConnector.next()) != null) {
+                    InetSocketAddress addr = tuple.get1();
+                    Socket sock = tuple.get2();
+                    Exception ex = tuple.get3();
 
-                    // Address is responsive, reset period start.
-                    noResStart = 0;
+                    if (ex == null) {
+                        assert sock != null;
 
-                    switch (res) {
-                        case RES_WAIT:
-                            // Concurrent startup, try sending join request 
again or wait if no success.
-                            retry = true;
+                        try {
+                            Integer res = sendMessageDirectly(joinReq, addr, 
sock);
 
-                            break;
-                        case RES_OK:
-                            if (log.isDebugEnabled())
-                                log.debug("Join request message has been sent 
to address [addr=" + addr +
-                                    ", req=" + joinReq + ']');
+                            assert res != null;
 
-                            // Join request sending succeeded, wait for 
response from topology.
-                            return true;
+                            noResAddrs.remove(addr);
 
-                        default:
-                            // Concurrent startup, try next node.
-                            if (res == RES_CONTINUE_JOIN) {
-                                if (!fromAddrs.contains(addr))
+                            // Address is responsive, reset period start.
+                            noResStart = 0;
+
+                            switch (res) {
+                                case RES_WAIT:
+                                    // Concurrent startup, try sending join 
request again or wait if no success.
                                     retry = true;
-                            }
-                            else {
-                                if (log.isDebugEnabled())
-                                    log.debug("Unexpected response to join 
request: " + res);
 
-                                retry = true;
-                            }
+                                    break;
+                                case RES_OK:
+                                    if (log.isDebugEnabled())
+                                        log.debug("Join request message has 
been sent to address [addr=" + addr +
+                                            ", req=" + joinReq + ']');
 
-                            break;
-                    }
-                }
-                catch (IgniteSpiException e) {
-                    if (errs == null)
-                        errs = new IgniteCheckedException("Multiple connection 
attempts failed.");
+                                    // Join request sending succeeded, wait 
for response from topology.
+                                    return true;
 
-                    errs.addSuppressed(e);
+                                default:
+                                    // Concurrent startup, try next node.
+                                    if (res == RES_CONTINUE_JOIN) {
+                                        if (!fromAddrs.contains(addr))
+                                            retry = true;
+                                    }
+                                    else {
+                                        if (log.isDebugEnabled())
+                                            log.debug("Unexpected response to 
join request: " + res);
 
-                    if (log.isDebugEnabled()) {
-                        IOException ioe = X.cause(e, IOException.class);
+                                        retry = true;
+                                    }
 
-                        log.debug("Failed to send join request message [addr=" 
+ addr +
-                            ", msg=" + ioe != null ? ioe.getMessage() : 
e.getMessage() + ']');
+                                    break;
+                            }
+                        }
+                        catch (IgniteSpiException e) {
+                            ex = e;
+                        }
                     }
 
-                    noResAddrs.add(addr);
+                    if (ex != null) {
+                        errs.add(ex);
+
+                        if (log.isDebugEnabled()) {
+                            IOException ioe = X.cause(ex, IOException.class);
+
+                            log.debug("Failed to send join request message 
[addr=" + addr +
+                                ", msg=" + ioe != null ? ioe.getMessage() : 
ex.getMessage() + ']');
+                        }
+
+                        noResAddrs.add(addr);
+                    }
                 }
             }
+            finally {
+                multiConnector.close();
+            }
 
             if (retry) {
                 if (log.isDebugEnabled())
@@ -1467,7 +1485,16 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
                 }
             }
             else if (!ipFinder.isShared() && !ipFinderHasLocAddr) {
-                if (errs != null && X.hasCause(errs, ConnectException.class))
+                IgniteCheckedException e = null;
+
+                if (!errs.isEmpty()) {
+                    e = new IgniteCheckedException("Multiple connection 
attempts failed.");
+
+                    for (Exception err : errs)
+                        e.addSuppressed(err);
+                }
+
+                if (e != null && X.hasCause(e, ConnectException.class))
                     LT.warn(log, null, "Failed to connect to any address from 
IP finder " +
                         "(make sure IP finder addresses are correct and 
firewalls are disabled on all host machines): " +
                         addrs);
@@ -1480,14 +1507,14 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
                             "Failed to connect to any address from IP finder 
within join timeout " +
                                 "(make sure IP finder addresses are correct, 
and operating system firewalls are disabled " +
                                 "on all host machines, or consider increasing 
'joinTimeout' configuration property): " +
-                                addrs, errs);
+                                addrs, e);
                 }
 
                 try {
                     U.sleep(2000);
                 }
-                catch (IgniteInterruptedCheckedException e) {
-                    throw new IgniteSpiException("Thread has been 
interrupted.", e);
+                catch (IgniteInterruptedCheckedException ex) {
+                    throw new IgniteSpiException("Thread has been 
interrupted.", ex);
                 }
             }
             else
@@ -1503,17 +1530,15 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
      * @param msg Message to send.
      * @param addr Address to send message to.
      * @return Response read from the recipient or {@code null} if no response 
is supposed.
-     * @throws org.apache.ignite.spi.IgniteSpiException If an error occurs.
+     * @throws IgniteSpiException If an error occurs.
      */
-    @Nullable private Integer sendMessageDirectly(TcpDiscoveryAbstractMessage 
msg, InetSocketAddress addr)
+    @Nullable private Integer sendMessageDirectly(TcpDiscoveryAbstractMessage 
msg, InetSocketAddress addr, Socket sock)
         throws IgniteSpiException {
         assert msg != null;
         assert addr != null;
 
         Collection<Throwable> errs = null;
 
-        Socket sock = null;
-
         long ackTimeout0 = ackTimeout;
 
         int connectAttempts = 1;
@@ -1532,7 +1557,8 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
             try {
                 long tstamp = U.currentTimeMillis();
 
-                sock = openSocket(addr);
+                if (sock == null)
+                    sock = openSocket(addr);
 
                 openSock = true;
 
@@ -1612,6 +1638,8 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
             }
             finally {
                 U.closeQuiet(sock);
+
+                sock = null;
             }
         }
 
@@ -1634,7 +1662,7 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
      * Marshalls credentials with discovery SPI marshaller (will replace 
attribute value).
      *
      * @param node Node to marshall credentials for.
-     * @throws org.apache.ignite.spi.IgniteSpiException If marshalling failed.
+     * @throws IgniteSpiException If marshalling failed.
      */
     private void marshalCredentials(TcpDiscoveryNode node) throws 
IgniteSpiException {
         try {
@@ -1656,7 +1684,7 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
      *
      * @param node Node to unmarshall credentials for.
      * @return Security credentials.
-     * @throws org.apache.ignite.spi.IgniteSpiException If unmarshal fails.
+     * @throws IgniteSpiException If unmarshal fails.
      */
     private GridSecurityCredentials unmarshalCredentials(TcpDiscoveryNode 
node) throws IgniteSpiException {
         try {
@@ -3337,7 +3365,7 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
          *
          * @param node Node to send message to.
          * @param msg Message.
-         * @throws org.apache.ignite.spi.IgniteSpiException Last failure if 
all attempts failed.
+         * @throws IgniteSpiException Last failure if all attempts failed.
          */
         private void trySendMessageDirectly(TcpDiscoveryNode node, 
TcpDiscoveryAbstractMessage msg)
             throws IgniteSpiException {
@@ -3358,7 +3386,7 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
 
             for (InetSocketAddress addr : getNodeAddresses(node, 
U.sameMacs(locNode, node))) {
                 try {
-                    sendMessageDirectly(msg, addr);
+                    sendMessageDirectly(msg, addr, null);
 
                     ex = null;
 
@@ -4385,7 +4413,7 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
         /**
          * Constructor.
          *
-         * @throws org.apache.ignite.spi.IgniteSpiException In case of error.
+         * @throws IgniteSpiException In case of error.
          */
         TcpServer() throws IgniteSpiException {
             super(ignite.name(), "tcp-disco-srvr", log);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a9ecd99f/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
index 52156a4..87ee2fe 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
@@ -22,6 +22,7 @@ import org.apache.ignite.cluster.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.io.*;
+import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
@@ -1005,4 +1006,113 @@ abstract class TcpDiscoverySpiAdapter extends 
IgniteSpiAdapter implements Discov
             TcpDiscoverySpiAdapter.this.writeToSocket(sock, msg, bout);
         }
     }
+
+    /**
+     *
+     */
+    protected class SocketMultiConnector {
+        /** */
+        private int connInProgress;
+
+        /** */
+        private boolean closed;
+
+        /** */
+        private final ExecutorService executor;
+
+        /** */
+        private final Queue<GridTuple3<InetSocketAddress, Socket, Exception>> 
queue = new LinkedList<>();
+
+        /**
+         * @param addrs Addresses.
+         * @param retryCnt Retry count.
+         */
+        public SocketMultiConnector(Collection<InetSocketAddress> addrs, final 
int retryCnt) {
+            connInProgress = addrs.size();
+
+            executor = new ThreadPoolExecutor(0, 10, 1L, TimeUnit.MILLISECONDS,
+                new SynchronousQueue<Runnable>());
+
+            for (final InetSocketAddress addr : addrs) {
+                executor.execute(new Runnable() {
+                    @Override public void run() {
+                        Exception ex = null;
+                        Socket sock = null;
+
+                        for (int i = 0; i < retryCnt; i++) {
+                            synchronized (SocketMultiConnector.this) {
+                                if (closed)
+                                    return;
+                            }
+
+                            try {
+                                sock = openSocket(addr);
+
+                                break;
+                            }
+                            catch (Exception e) {
+                                ex = e;
+                            }
+                        }
+
+                        synchronized (SocketMultiConnector.this) {
+                            if (closed)
+                                U.closeQuiet(sock);
+                            else
+                                queue.add(new GridTuple3<>(addr, sock, ex));
+
+                            connInProgress--;
+
+                            SocketMultiConnector.this.notifyAll();
+                        }
+                    }
+                });
+            }
+        }
+
+        /**
+         *
+         */
+        @Nullable public synchronized GridTuple3<InetSocketAddress, Socket, 
Exception> next() {
+            try {
+                do {
+                    if (closed)
+                        return null;
+
+                    GridTuple3<InetSocketAddress, Socket, Exception> res = 
queue.poll();
+
+                    if (res != null)
+                        return res;
+
+                    if (connInProgress == 0)
+                        return null;
+
+                    wait();
+                }
+                while (true);
+            }
+            catch (InterruptedException e) {
+                throw new IgniteSpiException("Thread has been interrupted.", 
e);
+            }
+        }
+
+        /**
+         *
+         */
+        public void close() {
+            synchronized (this) {
+                if (closed)
+                    return;
+
+                closed = true;
+
+                notifyAll();
+            }
+
+            executor.shutdown();
+
+            for (GridTuple3<InetSocketAddress, Socket, Exception> tuple : 
queue)
+                U.closeQuiet(tuple.get2());
+        }
+    }
 }

Reply via email to