Repository: incubator-ignite Updated Branches: refs/heads/ignite-237 6deba53c2 -> 6c67bdf27
IGNITE-11 (Discovery takes a lot of time on Windows if DescoverySpi contains several hosts and several ports per host) Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a9ecd99f Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a9ecd99f Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a9ecd99f Branch: refs/heads/ignite-237 Commit: a9ecd99f1b0713890c16e42cd53c264167959e0f Parents: c93d86f Author: sevdokimov <sevdoki...@gridgain.com> Authored: Thu Feb 19 16:30:41 2015 +0300 Committer: sevdokimov <sevdoki...@gridgain.com> Committed: Thu Feb 19 16:30:41 2015 +0300 ---------------------------------------------------------------------- .../spi/discovery/tcp/TcpDiscoverySpi.java | 138 +++++++++++-------- .../discovery/tcp/TcpDiscoverySpiAdapter.java | 110 +++++++++++++++ 2 files changed, 193 insertions(+), 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a9ecd99f/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index aef8259..3e9c120 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -1395,65 +1395,83 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov return false; boolean retry = false; - IgniteCheckedException errs = null; + Collection<Exception> errs = new ArrayList<>(); - for (InetSocketAddress addr : addrs) { - try { - Integer res = sendMessageDirectly(joinReq, addr); + SocketMultiConnector multiConnector = new SocketMultiConnector(addrs, 2); - assert res != null; + try { + GridTuple3<InetSocketAddress, Socket, Exception> tuple; - noResAddrs.remove(addr); + while ((tuple = multiConnector.next()) != null) { + InetSocketAddress addr = tuple.get1(); + Socket sock = tuple.get2(); + Exception ex = tuple.get3(); - // Address is responsive, reset period start. - noResStart = 0; + if (ex == null) { + assert sock != null; - switch (res) { - case RES_WAIT: - // Concurrent startup, try sending join request again or wait if no success. - retry = true; + try { + Integer res = sendMessageDirectly(joinReq, addr, sock); - break; - case RES_OK: - if (log.isDebugEnabled()) - log.debug("Join request message has been sent to address [addr=" + addr + - ", req=" + joinReq + ']'); + assert res != null; - // Join request sending succeeded, wait for response from topology. - return true; + noResAddrs.remove(addr); - default: - // Concurrent startup, try next node. - if (res == RES_CONTINUE_JOIN) { - if (!fromAddrs.contains(addr)) + // Address is responsive, reset period start. + noResStart = 0; + + switch (res) { + case RES_WAIT: + // Concurrent startup, try sending join request again or wait if no success. retry = true; - } - else { - if (log.isDebugEnabled()) - log.debug("Unexpected response to join request: " + res); - retry = true; - } + break; + case RES_OK: + if (log.isDebugEnabled()) + log.debug("Join request message has been sent to address [addr=" + addr + + ", req=" + joinReq + ']'); - break; - } - } - catch (IgniteSpiException e) { - if (errs == null) - errs = new IgniteCheckedException("Multiple connection attempts failed."); + // Join request sending succeeded, wait for response from topology. + return true; - errs.addSuppressed(e); + default: + // Concurrent startup, try next node. + if (res == RES_CONTINUE_JOIN) { + if (!fromAddrs.contains(addr)) + retry = true; + } + else { + if (log.isDebugEnabled()) + log.debug("Unexpected response to join request: " + res); - if (log.isDebugEnabled()) { - IOException ioe = X.cause(e, IOException.class); + retry = true; + } - log.debug("Failed to send join request message [addr=" + addr + - ", msg=" + ioe != null ? ioe.getMessage() : e.getMessage() + ']'); + break; + } + } + catch (IgniteSpiException e) { + ex = e; + } } - noResAddrs.add(addr); + if (ex != null) { + errs.add(ex); + + if (log.isDebugEnabled()) { + IOException ioe = X.cause(ex, IOException.class); + + log.debug("Failed to send join request message [addr=" + addr + + ", msg=" + ioe != null ? ioe.getMessage() : ex.getMessage() + ']'); + } + + noResAddrs.add(addr); + } } } + finally { + multiConnector.close(); + } if (retry) { if (log.isDebugEnabled()) @@ -1467,7 +1485,16 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov } } else if (!ipFinder.isShared() && !ipFinderHasLocAddr) { - if (errs != null && X.hasCause(errs, ConnectException.class)) + IgniteCheckedException e = null; + + if (!errs.isEmpty()) { + e = new IgniteCheckedException("Multiple connection attempts failed."); + + for (Exception err : errs) + e.addSuppressed(err); + } + + if (e != null && X.hasCause(e, ConnectException.class)) LT.warn(log, null, "Failed to connect to any address from IP finder " + "(make sure IP finder addresses are correct and firewalls are disabled on all host machines): " + addrs); @@ -1480,14 +1507,14 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov "Failed to connect to any address from IP finder within join timeout " + "(make sure IP finder addresses are correct, and operating system firewalls are disabled " + "on all host machines, or consider increasing 'joinTimeout' configuration property): " + - addrs, errs); + addrs, e); } try { U.sleep(2000); } - catch (IgniteInterruptedCheckedException e) { - throw new IgniteSpiException("Thread has been interrupted.", e); + catch (IgniteInterruptedCheckedException ex) { + throw new IgniteSpiException("Thread has been interrupted.", ex); } } else @@ -1503,17 +1530,15 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov * @param msg Message to send. * @param addr Address to send message to. * @return Response read from the recipient or {@code null} if no response is supposed. - * @throws org.apache.ignite.spi.IgniteSpiException If an error occurs. + * @throws IgniteSpiException If an error occurs. */ - @Nullable private Integer sendMessageDirectly(TcpDiscoveryAbstractMessage msg, InetSocketAddress addr) + @Nullable private Integer sendMessageDirectly(TcpDiscoveryAbstractMessage msg, InetSocketAddress addr, Socket sock) throws IgniteSpiException { assert msg != null; assert addr != null; Collection<Throwable> errs = null; - Socket sock = null; - long ackTimeout0 = ackTimeout; int connectAttempts = 1; @@ -1532,7 +1557,8 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov try { long tstamp = U.currentTimeMillis(); - sock = openSocket(addr); + if (sock == null) + sock = openSocket(addr); openSock = true; @@ -1612,6 +1638,8 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov } finally { U.closeQuiet(sock); + + sock = null; } } @@ -1634,7 +1662,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov * Marshalls credentials with discovery SPI marshaller (will replace attribute value). * * @param node Node to marshall credentials for. - * @throws org.apache.ignite.spi.IgniteSpiException If marshalling failed. + * @throws IgniteSpiException If marshalling failed. */ private void marshalCredentials(TcpDiscoveryNode node) throws IgniteSpiException { try { @@ -1656,7 +1684,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov * * @param node Node to unmarshall credentials for. * @return Security credentials. - * @throws org.apache.ignite.spi.IgniteSpiException If unmarshal fails. + * @throws IgniteSpiException If unmarshal fails. */ private GridSecurityCredentials unmarshalCredentials(TcpDiscoveryNode node) throws IgniteSpiException { try { @@ -3337,7 +3365,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov * * @param node Node to send message to. * @param msg Message. - * @throws org.apache.ignite.spi.IgniteSpiException Last failure if all attempts failed. + * @throws IgniteSpiException Last failure if all attempts failed. */ private void trySendMessageDirectly(TcpDiscoveryNode node, TcpDiscoveryAbstractMessage msg) throws IgniteSpiException { @@ -3358,7 +3386,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov for (InetSocketAddress addr : getNodeAddresses(node, U.sameMacs(locNode, node))) { try { - sendMessageDirectly(msg, addr); + sendMessageDirectly(msg, addr, null); ex = null; @@ -4385,7 +4413,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov /** * Constructor. * - * @throws org.apache.ignite.spi.IgniteSpiException In case of error. + * @throws IgniteSpiException In case of error. */ TcpServer() throws IgniteSpiException { super(ignite.name(), "tcp-disco-srvr", log); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a9ecd99f/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java index 52156a4..87ee2fe 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java @@ -22,6 +22,7 @@ import org.apache.ignite.cluster.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.io.*; +import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; @@ -1005,4 +1006,113 @@ abstract class TcpDiscoverySpiAdapter extends IgniteSpiAdapter implements Discov TcpDiscoverySpiAdapter.this.writeToSocket(sock, msg, bout); } } + + /** + * + */ + protected class SocketMultiConnector { + /** */ + private int connInProgress; + + /** */ + private boolean closed; + + /** */ + private final ExecutorService executor; + + /** */ + private final Queue<GridTuple3<InetSocketAddress, Socket, Exception>> queue = new LinkedList<>(); + + /** + * @param addrs Addresses. + * @param retryCnt Retry count. + */ + public SocketMultiConnector(Collection<InetSocketAddress> addrs, final int retryCnt) { + connInProgress = addrs.size(); + + executor = new ThreadPoolExecutor(0, 10, 1L, TimeUnit.MILLISECONDS, + new SynchronousQueue<Runnable>()); + + for (final InetSocketAddress addr : addrs) { + executor.execute(new Runnable() { + @Override public void run() { + Exception ex = null; + Socket sock = null; + + for (int i = 0; i < retryCnt; i++) { + synchronized (SocketMultiConnector.this) { + if (closed) + return; + } + + try { + sock = openSocket(addr); + + break; + } + catch (Exception e) { + ex = e; + } + } + + synchronized (SocketMultiConnector.this) { + if (closed) + U.closeQuiet(sock); + else + queue.add(new GridTuple3<>(addr, sock, ex)); + + connInProgress--; + + SocketMultiConnector.this.notifyAll(); + } + } + }); + } + } + + /** + * + */ + @Nullable public synchronized GridTuple3<InetSocketAddress, Socket, Exception> next() { + try { + do { + if (closed) + return null; + + GridTuple3<InetSocketAddress, Socket, Exception> res = queue.poll(); + + if (res != null) + return res; + + if (connInProgress == 0) + return null; + + wait(); + } + while (true); + } + catch (InterruptedException e) { + throw new IgniteSpiException("Thread has been interrupted.", e); + } + } + + /** + * + */ + public void close() { + synchronized (this) { + if (closed) + return; + + closed = true; + + notifyAll(); + } + + executor.shutdown(); + + for (GridTuple3<InetSocketAddress, Socket, Exception> tuple : queue) + U.closeQuiet(tuple.get2()); + } + } }