http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4a735398/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 1911b5b,0b3330a..2c7f8be --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@@ -1758,35 -1428,22 +1758,35 @@@ public class TcpCommunicationSpi extend GridCommunicationClient client = null; try { - client = reserveClient(node); + boolean retry; + + do { + client = reserveClient(node); - UUID nodeId = null; + UUID nodeId = null; - if (!client.async() && !getSpiContext().localNode().version().equals(node.version())) - nodeId = node.id(); + if (!client.async() && !getSpiContext().localNode().version().equals(node.version())) + nodeId = node.id(); - client.sendMessage(nodeId, msg); + retry = client.sendMessage(nodeId, msg); - client.release(); + client.release(); - client = null; + client = null; + + if (!retry) + sentMsgsCnt.increment(); + else { + ClusterNode node0 = getSpiContext().node(node.id()); - sentMsgsCnt.increment(); + if (node0 == null) + throw new GridException("Failed to send message to remote node " + + "(node has left the grid): " + node.id()); + } + } + while (retry); } - catch (GridException e) { + catch (IgniteCheckedException e) { throw new IgniteSpiException("Failed to send message to remote node: " + node, e); } finally { @@@ -2196,19 -1812,12 +2196,19 @@@ * Performs handshake in timeout-safe way. * * @param client Client. + * @param recovery Recovery descriptor if use recovery handshake, otherwise {@code null}. * @param rmtNodeId Remote node. * @param timeout Timeout for handshake. - * @throws IgniteCheckedException If handshake failed or wasn't completed withing timeout. + * @throws GridException If handshake failed or wasn't completed withing timeout. + * @return Handshake response. */ @SuppressWarnings("ThrowFromFinallyBlock") - private <T> void safeHandshake(T client, UUID rmtNodeId, long timeout) throws IgniteCheckedException { + private <T> long safeHandshake( + T client, + @Nullable GridNioRecoveryDescriptor recovery, + UUID rmtNodeId, + long timeout - ) throws GridException { ++ ) throws IgniteCheckedException { HandshakeTimeoutObject<T> obj = new HandshakeTimeoutObject<>(client, U.currentTimeMillis() + timeout); sockTimeoutWorker.addTimeoutObject(obj); @@@ -2373,56 -1901,8 +2373,56 @@@ return S.toString(TcpCommunicationSpi.class, this); } + /** + * + */ + private static class ClientKey { + /** */ + private UUID nodeId; + + /** */ + private long order; + + /** + * @param nodeId Node ID. + * @param order Node order. + */ + private ClientKey(UUID nodeId, long order) { + this.nodeId = nodeId; + this.order = order; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object obj) { + if (this == obj) + return true; + + if (obj == null || getClass() != obj.getClass()) + return false; + + ClientKey other = (ClientKey)obj; + + return order == other.order && nodeId.equals(other.nodeId); + + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = nodeId.hashCode(); + + res = 31 * res + (int)(order ^ (order >>> 32)); + + return res; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ClientKey.class, this); + } + } + /** Internal exception class for proper timeout handling. */ - private static class HandshakeTimeoutException extends GridException { + private static class HandshakeTimeoutException extends IgniteCheckedException { /** */ private static final long serialVersionUID = 0L;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4a735398/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4a735398/modules/core/src/main/java/org/gridgain/grid/util/nio/GridCommunicationClient.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/gridgain/grid/util/nio/GridCommunicationClient.java index 5b0db53,6404002..6c9631e --- a/modules/core/src/main/java/org/gridgain/grid/util/nio/GridCommunicationClient.java +++ b/modules/core/src/main/java/org/gridgain/grid/util/nio/GridCommunicationClient.java @@@ -86,10 -86,9 +86,10 @@@ public interface GridCommunicationClien /** * @param nodeId Node ID (provided only if versions of local and remote nodes are different). * @param msg Message to send. - * @throws GridException If failed. + * @throws IgniteCheckedException If failed. + * @return {@code True} if should try to resend message. */ - boolean sendMessage(@Nullable UUID nodeId, GridTcpCommunicationMessageAdapter msg) throws GridException; - void sendMessage(@Nullable UUID nodeId, GridTcpCommunicationMessageAdapter msg) throws IgniteCheckedException; ++ boolean sendMessage(@Nullable UUID nodeId, GridTcpCommunicationMessageAdapter msg) throws IgniteCheckedException; /** * @param timeout Timeout. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4a735398/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioFinishedFuture.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4a735398/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioFuture.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioFuture.java index a3ab1ef,1754db7..19de132 --- a/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioFuture.java +++ b/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioFuture.java @@@ -27,12 -28,12 +28,12 @@@ public interface GridNioFuture<R> * returns operation result. * * @return Operation result. - * @throws GridInterruptedException Subclass of {@link IgniteCheckedException} thrown if the wait was interrupted. - * @throws org.apache.ignite.lang.IgniteFutureCancelledException Subclass of {@link IgniteCheckedException} throws if operation was cancelled. + * @throws GridInterruptedException Subclass of {@link GridException} thrown if the wait was interrupted. + * @throws IgniteFutureCancelledException Subclass of {@link GridException} throws if operation was cancelled. - * @throws GridException If operation failed. + * @throws IgniteCheckedException If operation failed. * @throws IOException If IOException occurred while performing operation. */ - public R get() throws IOException, GridException; + public R get() throws IOException, IgniteCheckedException; /** * Synchronously waits for completion of the operation for @@@ -41,13 -42,13 +42,13 @@@ * * @param timeout The maximum time to wait in milliseconds. * @return Operation result. - * @throws GridInterruptedException Subclass of {@link IgniteCheckedException} thrown if the wait was interrupted. - * @throws org.apache.ignite.lang.IgniteFutureTimeoutException Subclass of {@link IgniteCheckedException} thrown if the wait was timed out. - * @throws org.apache.ignite.lang.IgniteFutureCancelledException Subclass of {@link IgniteCheckedException} throws if operation was cancelled. + * @throws GridInterruptedException Subclass of {@link GridException} thrown if the wait was interrupted. + * @throws IgniteFutureTimeoutException Subclass of {@link GridException} thrown if the wait was timed out. + * @throws IgniteFutureCancelledException Subclass of {@link GridException} throws if operation was cancelled. - * @throws GridException If operation failed. + * @throws IgniteCheckedException If operation failed. * @throws IOException If IOException occurred while performing operation. */ - public R get(long timeout) throws IOException, GridException; + public R get(long timeout) throws IOException, IgniteCheckedException; /** * Synchronously waits for completion of the operation for @@@ -56,13 -57,13 +57,13 @@@ * @param timeout The maximum time to wait. * @param unit The time unit of the {@code timeout} argument. * @return Operation result. - * @throws GridInterruptedException Subclass of {@link IgniteCheckedException} thrown if the wait was interrupted. - * @throws org.apache.ignite.lang.IgniteFutureTimeoutException Subclass of {@link IgniteCheckedException} thrown if the wait was timed out. - * @throws org.apache.ignite.lang.IgniteFutureCancelledException Subclass of {@link IgniteCheckedException} throws if operation was cancelled. + * @throws GridInterruptedException Subclass of {@link GridException} thrown if the wait was interrupted. + * @throws IgniteFutureTimeoutException Subclass of {@link GridException} thrown if the wait was timed out. + * @throws IgniteFutureCancelledException Subclass of {@link GridException} throws if operation was cancelled. - * @throws GridException If operation failed. + * @throws IgniteCheckedException If operation failed. * @throws IOException If IOException occurred while performing operation. */ - public R get(long timeout, TimeUnit unit) throws IOException, GridException; + public R get(long timeout, TimeUnit unit) throws IOException, IgniteCheckedException; /** * Cancels this future. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4a735398/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioFutureImpl.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioFutureImpl.java index 7ab2e14,6c5a6bc..ee84796 --- a/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioFutureImpl.java +++ b/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioFutureImpl.java @@@ -118,10 -119,10 +119,10 @@@ public class GridNioFutureImpl<R> exten * @param nanosTimeout Timeout (nanoseconds). * @return Result. * @throws InterruptedException If interrupted. - * @throws org.apache.ignite.lang.IgniteFutureTimeoutException If timeout reached before computation completed. + * @throws IgniteFutureTimeoutException If timeout reached before computation completed. - * @throws GridException If error occurred. + * @throws IgniteCheckedException If error occurred. */ - @Nullable protected R get0(long nanosTimeout) throws InterruptedException, GridException { + @Nullable protected R get0(long nanosTimeout) throws InterruptedException, IgniteCheckedException { if (endTime == 0 && !tryAcquireSharedNanos(0, nanosTimeout)) throw new IgniteFutureTimeoutException("Timeout was reached before computation completed."); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4a735398/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioServer.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4a735398/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioSessionImpl.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4a735398/modules/core/src/main/java/org/gridgain/grid/util/nio/GridShmemCommunicationClient.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/gridgain/grid/util/nio/GridShmemCommunicationClient.java index 505c788,8777405..501e7ee --- a/modules/core/src/main/java/org/gridgain/grid/util/nio/GridShmemCommunicationClient.java +++ b/modules/core/src/main/java/org/gridgain/grid/util/nio/GridShmemCommunicationClient.java @@@ -103,10 -103,10 +103,10 @@@ public class GridShmemCommunicationClie } /** {@inheritDoc} */ - @Override public synchronized void sendMessage(@Nullable UUID nodeId, GridTcpCommunicationMessageAdapter msg) + @Override public synchronized boolean sendMessage(@Nullable UUID nodeId, GridTcpCommunicationMessageAdapter msg) - throws GridException { + throws IgniteCheckedException { if (closed()) - throw new GridException("Communication client was closed: " + this); + throw new IgniteCheckedException("Communication client was closed: " + this); assert writeBuf.hasArray(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4a735398/modules/core/src/main/java/org/gridgain/grid/util/nio/GridTcpCommunicationClient.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/gridgain/grid/util/nio/GridTcpCommunicationClient.java index fbca363,a20ea24..632ce35 --- a/modules/core/src/main/java/org/gridgain/grid/util/nio/GridTcpCommunicationClient.java +++ b/modules/core/src/main/java/org/gridgain/grid/util/nio/GridTcpCommunicationClient.java @@@ -182,10 -182,10 +182,10 @@@ public class GridTcpCommunicationClien } /** {@inheritDoc} */ - @Override public void sendMessage(@Nullable UUID nodeId, GridTcpCommunicationMessageAdapter msg) + @Override public boolean sendMessage(@Nullable UUID nodeId, GridTcpCommunicationMessageAdapter msg) - throws GridException { + throws IgniteCheckedException { if (closed()) - throw new GridException("Client was closed: " + this); + throw new IgniteCheckedException("Client was closed: " + this); assert writeBuf.hasArray(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4a735398/modules/core/src/main/java/org/gridgain/grid/util/nio/GridTcpNioCommunicationClient.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/gridgain/grid/util/nio/GridTcpNioCommunicationClient.java index 55997d3,49053d3..3d8668d --- a/modules/core/src/main/java/org/gridgain/grid/util/nio/GridTcpNioCommunicationClient.java +++ b/modules/core/src/main/java/org/gridgain/grid/util/nio/GridTcpNioCommunicationClient.java @@@ -96,8 -98,8 +95,8 @@@ public class GridTcpNioCommunicationCli } /** {@inheritDoc} */ - @Override public void sendMessage(@Nullable UUID nodeId, GridTcpCommunicationMessageAdapter msg) + @Override public boolean sendMessage(@Nullable UUID nodeId, GridTcpCommunicationMessageAdapter msg) - throws GridException { + throws IgniteCheckedException { // Node ID is never provided in asynchronous send mode. assert nodeId == null; @@@ -108,23 -113,9 +107,23 @@@ fut.get(); } catch (IOException e) { - throw new IgniteCheckedException("Failed to send message [client=" + this + ']', e); + if (log.isDebugEnabled()) + log.debug("Failed to send message [client=" + this + ", err=" +e + ']'); + + return true; + } - catch (GridException e) { ++ catch (IgniteCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to send message [client=" + this + ", err=" +e + ']'); + + if (e.getCause() instanceof IOException) + return true; + else - throw new GridException("Failed to send message [client=" + this + ']', e); ++ throw new IgniteCheckedException("Failed to send message [client=" + this + ']', e); } } + + return false; } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4a735398/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java index 86d68a7,0dfb5fd..07b5059 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java @@@ -9,13 -9,13 +9,13 @@@ package org.apache.ignite.spi.communication; -import mx4j.tools.adaptor.http.*; + import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.lang.*; - import org.gridgain.grid.*; import org.gridgain.grid.util.direct.*; +import org.gridgain.grid.util.typedef.*; +import org.gridgain.grid.util.typedef.internal.*; import org.gridgain.testframework.*; -import org.gridgain.testframework.config.*; import org.gridgain.testframework.junits.*; import org.gridgain.testframework.junits.spi.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4a735398/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java index 0d650fb,80f2226..1889cd2 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java @@@ -9,12 -9,13 +9,13 @@@ package org.apache.ignite.spi.communication.tcp; -import mx4j.tools.adaptor.http.*; + import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.lang.*; - import org.gridgain.grid.*; import org.apache.ignite.spi.communication.*; + import org.gridgain.grid.*; import org.gridgain.grid.util.direct.*; +import org.gridgain.grid.util.lang.*; import org.gridgain.grid.util.nio.*; import org.gridgain.grid.util.typedef.*; import org.gridgain.grid.util.typedef.internal.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4a735398/modules/core/src/test/java/org/gridgain/grid/kernal/managers/GridManagerStopSelfTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4a735398/modules/core/src/test/java/org/gridgain/grid/util/nio/impl/GridNioFilterChainSelfTest.java ----------------------------------------------------------------------