# ignite-sprint-6 race in tcp communication connection, retry full partition map send
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/285d790f Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/285d790f Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/285d790f Branch: refs/heads/master Commit: 285d790ff0ce597fde65a778c381e59a4f1c89aa Parents: 1605996 Author: sboikov <sboi...@gridgain.com> Authored: Mon Jun 22 16:40:08 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Mon Jun 22 16:58:01 2015 +0300 ---------------------------------------------------------------------- .../GridDhtPartitionsExchangeFuture.java | 49 +++++++++++++++----- .../communication/tcp/TcpCommunicationSpi.java | 12 +++++ .../GridTcpCommunicationSpiConfigSelfTest.java | 1 - 3 files changed, 49 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/285d790f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 9f18c98..7c780b0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -1078,18 +1078,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT log.debug("Received message for finished future (will reply only to sender) [msg=" + msg + ", fut=" + this + ']'); - try { - ClusterNode n = cctx.node(nodeId); - - if (n != null) - sendAllPartitions(F.asList(n), exchId); - } - catch (IgniteCheckedException e) { - scheduleRecheck(); - - U.error(log, "Failed to send full partition map to node (will retry after timeout) [node=" + nodeId + - ", exchangeId=" + exchId + ']', e); - } + sendAllPartitions(nodeId, cctx.gridConfig().getNetworkSendRetryCount()); } else { initFut.listen(new CI1<IgniteInternalFuture<Boolean>>() { @@ -1146,6 +1135,42 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } /** + * @param nodeId Node ID. + * @param retryCnt Number of retries. + */ + private void sendAllPartitions(final UUID nodeId, final int retryCnt) { + ClusterNode n = cctx.node(nodeId); + + try { + if (n != null) + sendAllPartitions(F.asList(n), exchId); + } + catch (IgniteCheckedException e) { + if (e instanceof ClusterTopologyCheckedException || !cctx.discovery().alive(n)) { + log.debug("Failed to send full partition map to node, node left grid " + + "[rmtNode=" + nodeId + ", exchangeId=" + exchId + ']'); + + return; + } + + if (retryCnt > 0) { + long timeout = cctx.gridConfig().getNetworkSendRetryDelay(); + + LT.error(log, e, "Failed to send full partition map to node (will retry after timeout) " + + "[node=" + nodeId + ", exchangeId=" + exchId + ", timeout=" + timeout + ']'); + + cctx.time().addTimeoutObject(new GridTimeoutObjectAdapter(timeout) { + @Override public void onTimeout() { + sendAllPartitions(nodeId, retryCnt - 1); + } + }); + } + else + U.error(log, "Failed to send full partition map [node=" + n + ", exchangeId=" + exchId + ']', e); + } + } + + /** * @param nodeId Sender node ID. * @param msg Full partition info. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/285d790f/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 1035ee5..addf243d 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -1748,6 +1748,18 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter assert old == null : "Client already created " + "[node=" + node + ", client=" + client0 + ", oldClient=" + old + ']'; + + if (client0 instanceof GridTcpNioCommunicationClient) { + GridTcpNioCommunicationClient tcpClient = ((GridTcpNioCommunicationClient)client0); + + if (tcpClient.session().closeTime() > 0 && clients.remove(nodeId, client0)) { + if (log.isDebugEnabled()) + log.debug("Session was closed after client creation, will retry " + + "[node=" + node + ", client=" + client0 + ']'); + + client0 = null; + } + } } else U.sleep(200); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/285d790f/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java index c4a0916..4062931 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java @@ -37,7 +37,6 @@ public class GridTcpCommunicationSpiConfigSelfTest extends GridSpiAbstractConfig checkNegativeSpiProperty(new TcpCommunicationSpi(), "messageQueueLimit", -1); checkNegativeSpiProperty(new TcpCommunicationSpi(), "reconnectCount", 0); checkNegativeSpiProperty(new TcpCommunicationSpi(), "selectorsCount", 0); - checkNegativeSpiProperty(new TcpCommunicationSpi(), "minimumBufferedMessageCount", -1); checkNegativeSpiProperty(new TcpCommunicationSpi(), "connectTimeout", -1); checkNegativeSpiProperty(new TcpCommunicationSpi(), "maxConnectTimeout", -1); checkNegativeSpiProperty(new TcpCommunicationSpi(), "socketWriteTimeout", -1);