ignite-752: keep reusing failure detection threshold
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/1b6005a5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/1b6005a5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/1b6005a5 Branch: refs/heads/ignite-752 Commit: 1b6005a50584045cc11633541bc7b023ee399b32 Parents: 567aec1 Author: Denis Magda <dma...@gridgain.com> Authored: Wed Jul 15 17:11:45 2015 +0300 Committer: Denis Magda <dma...@gridgain.com> Committed: Wed Jul 15 17:11:45 2015 +0300 ---------------------------------------------------------------------- .../org/apache/ignite/spi/IgniteSpiAdapter.java | 61 ----------- .../IgniteSpiOperationTimeoutController.java | 93 ++++++++++++++++ .../spi/IgniteSpiOperationTimeoutException.java | 33 ++++++ .../ignite/spi/discovery/tcp/ServerImpl.java | 106 +++++++++++-------- .../spi/discovery/tcp/TcpDiscoveryImpl.java | 4 +- .../spi/discovery/tcp/TcpDiscoverySpi.java | 16 ++- 6 files changed, 196 insertions(+), 117 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b6005a5/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java index 82ed3d0..422ce81 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java @@ -31,12 +31,10 @@ import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.plugin.extensions.communication.*; import org.apache.ignite.plugin.security.*; import org.apache.ignite.resources.*; - import org.jetbrains.annotations.*; import javax.management.*; import java.io.*; -import java.net.*; import java.text.*; import java.util.*; @@ -581,51 +579,6 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement /** * TODO: IGNITE-752 - * @param dfltTimeout - * @return - */ - public long firstNetOperationTimeout(long dfltTimeout) { - return !failureDetectionThresholdEnabled ? dfltTimeout : failureDetectionThreshold; - } - - /** - * TODO: IGNITE-752 - * @param curTimeout - * @param lastOperStartTime - * @param dfltTimeout - * @return - * @throws IOException - */ - public long nextNetOperationTimeout(long curTimeout, long lastOperStartTime, long dfltTimeout) - throws NetOperationTimeoutException { - if (!failureDetectionThresholdEnabled) - return dfltTimeout; - - long timeLeft = curTimeout - lastOperStartTime; - - if (timeLeft <= 0) - throw new NetOperationTimeoutException("Network operation timed out. Increase failure detection threshold" + - " using IgniteConfiguration.setFailureDetectionThreshold() or set SPI specific timeouts manually." + - " Current failure detection threshold: " + failureDetectionThreshold); - - return timeLeft; - } - - /** - * TODO: IGNITE-752 - * @param e - * @return - */ - public boolean checkFailureDetectionThresholdReached(Exception e) { - if (!failureDetectionThresholdEnabled) - return false; - - return e instanceof NetOperationTimeoutException || e instanceof SocketTimeoutException || - X.hasCause(e, NetOperationTimeoutException.class, SocketException.class); - } - - /** - * TODO: IGNITE-752 * @param enabled */ public void failureDetectionThresholdEnabled(boolean enabled) { @@ -648,20 +601,6 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement return failureDetectionThreshold; } - - /** - * TODO: IGNITE-752 - */ - public static class NetOperationTimeoutException extends IgniteCheckedException { - /** - * Constructor. - * @param msg Error message. - */ - public NetOperationTimeoutException(String msg) { - super(msg); - } - } - /** * Temporarily SPI context. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b6005a5/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutController.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutController.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutController.java new file mode 100644 index 0000000..3ae4fa4 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutController.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.spi; + +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; +import java.net.*; + +/** + * TODO: IGNITE-752 + */ +public class IgniteSpiOperationTimeoutController { + /** */ + private long lastOperStartTs; + + /** */ + private long timeout; + + /** */ + private final boolean failureDetectionThresholdEnabled; + + /** */ + private final long failureDetectionThreshold; + + /** + * Constructor. + * + * @param adapter SPI adapter. + */ + public IgniteSpiOperationTimeoutController(IgniteSpiAdapter adapter) { + failureDetectionThresholdEnabled = adapter.failureDetectionThresholdEnabled(); + failureDetectionThreshold = adapter.failureDetectionThreshold(); + } + + /** + * TODO: IGNITE-752 + * @param dfltTimeout + * @return + * @throws IgniteSpiOperationTimeoutException + */ + public long nextTimeoutChunk(long dfltTimeout) throws IgniteSpiOperationTimeoutException { + if (!failureDetectionThresholdEnabled) + return dfltTimeout; + + if (lastOperStartTs == 0) { + timeout = failureDetectionThreshold; + lastOperStartTs = U.currentTimeMillis(); + } + else { + long curTs = U.currentTimeMillis(); + + timeout = timeout - (curTs - lastOperStartTs); + + lastOperStartTs = curTs; + + if (timeout <= 0) + throw new IgniteSpiOperationTimeoutException("Network operation timed out. Increase failure detection" + + " threshold using IgniteConfiguration.setFailureDetectionThreshold() or set SPI specific timeouts" + + " manually. Current failure detection threshold: " + failureDetectionThreshold); + } + + return timeout; + } + + /** + * TODO: IGNITE-752 + * @param e + * @return + */ + public boolean checkFailureDetectionThresholdReached(Exception e) { + if (!failureDetectionThresholdEnabled) + return false; + + return e instanceof IgniteSpiOperationTimeoutException || e instanceof SocketTimeoutException || + X.hasCause(e, IgniteSpiOperationTimeoutException.class, SocketException.class); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b6005a5/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutException.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutException.java new file mode 100644 index 0000000..c90b45b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutException.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi; + +import org.apache.ignite.*; + +/** + * TODO: IGNITE-752 + */ +public class IgniteSpiOperationTimeoutException extends IgniteCheckedException { + /** + * Constructor. + * @param msg Error message. + */ + public IgniteSpiOperationTimeoutException(String msg) { + super(msg); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b6005a5/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 9dd565c..d506507 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -50,7 +50,6 @@ import static org.apache.ignite.internal.IgniteNodeAttributes.*; import static org.apache.ignite.spi.IgnitePortProtocol.*; import static org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoverySpiState.*; import static org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusCheckMessage.*; -import static org.apache.ignite.spi.IgniteSpiAdapter.NetOperationTimeoutException; /** * @@ -279,9 +278,10 @@ class ServerImpl extends TcpDiscoveryImpl { msgWorker.addMessage(new TcpDiscoveryNodeLeftMessage(locNode.id())); synchronized (mux) { - long threshold = U.currentTimeMillis() + spi.netTimeout; + long timeout = spi.failureDetectionThresholdEnabled() ? spi.failureDetectionThreshold() : + spi.getNetworkTimeout(); - long timeout = spi.netTimeout; + long threshold = U.currentTimeMillis() + timeout; while (spiState != LEFT && timeout > 0) { try { @@ -509,8 +509,7 @@ class ServerImpl extends TcpDiscoveryImpl { try { Socket sock = null; - long timeout = 0; - long lastOperStartTs = 0; + IgniteSpiOperationTimeoutController timeoutCtrl = new IgniteSpiOperationTimeoutController(spi); int reconCnt = 0; @@ -519,22 +518,15 @@ class ServerImpl extends TcpDiscoveryImpl { if (addr.isUnresolved()) addr = new InetSocketAddress(InetAddress.getByName(addr.getHostName()), addr.getPort()); - timeout = lastOperStartTs == 0 ? spi.firstNetOperationTimeout(spi.getSocketTimeout()) : - spi.nextNetOperationTimeout(timeout, lastOperStartTs, spi.getSocketTimeout()); + long tstamp = U.currentTimeMillis(); - long tstamp = lastOperStartTs = U.currentTimeMillis(); + sock = spi.openSocket(addr, timeoutCtrl); - sock = spi.openSocket(addr, timeout); + spi.writeToSocket(sock, new TcpDiscoveryPingRequest(locNodeId, clientNodeId), + timeoutCtrl.nextTimeoutChunk(spi.getSocketTimeout())); - timeout = spi.nextNetOperationTimeout(timeout, lastOperStartTs, spi.getSocketTimeout()); - lastOperStartTs = U.currentTimeMillis(); - - spi.writeToSocket(sock, new TcpDiscoveryPingRequest(locNodeId, clientNodeId), timeout); - - timeout = spi.nextNetOperationTimeout(timeout, lastOperStartTs, spi.getNetworkTimeout()); - lastOperStartTs = U.currentTimeMillis(); - - TcpDiscoveryPingResponse res = spi.readMessage(sock, null, timeout); + TcpDiscoveryPingResponse res = spi.readMessage(sock, null, timeoutCtrl.nextTimeoutChunk( + spi.getNetworkTimeout())); if (locNodeId.equals(res.creatorNodeId())) { if (log.isDebugEnabled()) @@ -557,7 +549,7 @@ class ServerImpl extends TcpDiscoveryImpl { errs.add(e); - if (spi.checkFailureDetectionThresholdReached(e)) + if (timeoutCtrl.checkFailureDetectionThresholdReached(e)) break; else if (!spi.failureDetectionThresholdEnabled() && ++reconCnt == spi.getReconnectCount()) break; @@ -690,9 +682,10 @@ class ServerImpl extends TcpDiscoveryImpl { log.debug("Join request message has been sent (waiting for coordinator response)."); synchronized (mux) { - long threshold = U.currentTimeMillis() + spi.netTimeout; + long timeout = spi.failureDetectionThresholdEnabled() ? spi.failureDetectionThreshold() : + spi.getNetworkTimeout(); - long timeout = spi.netTimeout; + long threshold = U.currentTimeMillis() + timeout; while (spiState == CONNECTING && timeout > 0) { try { @@ -734,8 +727,9 @@ class ServerImpl extends TcpDiscoveryImpl { LT.warn(log, null, "Node has not been connected to topology and will repeat join process. " + "Check remote nodes logs for possible error messages. " + "Note that large topology may require significant time to start. " + - "Increase 'TcpDiscoverySpi.networkTimeout' configuration property " + - "if getting this message on the starting nodes [networkTimeout=" + spi.netTimeout + ']'); + "Increase 'IgniteConfiguration.failureDetectionThreshold' configuration property " + + "if getting this message on the starting nodes [failureDetectionThreshold=" + + spi.failureDetectionThreshold() + ']'); } } @@ -855,10 +849,10 @@ class ServerImpl extends TcpDiscoveryImpl { "(make sure IP finder addresses are correct and firewalls are disabled on all host machines): " + addrs); - if (spi.joinTimeout > 0) { + if (spi.getJoinTimeout() > 0) { if (noResStart == 0) noResStart = U.currentTimeMillis(); - else if (U.currentTimeMillis() - noResStart > spi.joinTimeout) + else if (U.currentTimeMillis() - noResStart > spi.getJoinTimeout()) throw new IgniteSpiException( "Failed to connect to any address from IP finder within join timeout " + "(make sure IP finder addresses are correct, and operating system firewalls are disabled " + @@ -895,15 +889,17 @@ class ServerImpl extends TcpDiscoveryImpl { Collection<Throwable> errs = null; - long ackTimeout0 = spi.ackTimeout; + long ackTimeout0 = spi.getAckTimeout(); int connectAttempts = 1; - boolean joinReqSent = false; + boolean joinReqSent; UUID locNodeId = getLocalNodeId(); - for (int i = 0; i < spi.reconCnt; i++) { + IgniteSpiOperationTimeoutController timeoutCtrl = new IgniteSpiOperationTimeoutController(spi); + + while (true){ // Need to set to false on each new iteration, // since remote node may leave in the middle of the first iteration. joinReqSent = false; @@ -915,14 +911,16 @@ class ServerImpl extends TcpDiscoveryImpl { try { long tstamp = U.currentTimeMillis(); - sock = spi.openSocket(addr); + sock = spi.openSocket(addr, timeoutCtrl); openSock = true; // Handshake. - spi.writeToSocket(sock, new TcpDiscoveryHandshakeRequest(locNodeId)); + spi.writeToSocket(sock, new TcpDiscoveryHandshakeRequest(locNodeId), timeoutCtrl.nextTimeoutChunk( + spi.getSocketTimeout())); - TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, ackTimeout0); + TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, timeoutCtrl.nextTimeoutChunk( + ackTimeout0)); if (locNodeId.equals(res.creatorNodeId())) { if (log.isDebugEnabled()) @@ -936,7 +934,7 @@ class ServerImpl extends TcpDiscoveryImpl { // Send message. tstamp = U.currentTimeMillis(); - spi.writeToSocket(sock, msg); + spi.writeToSocket(sock, msg, timeoutCtrl.nextTimeoutChunk(spi.getSocketTimeout())); spi.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp); @@ -953,7 +951,7 @@ class ServerImpl extends TcpDiscoveryImpl { // E.g. due to class not found issue. joinReqSent = msg instanceof TcpDiscoveryJoinRequestMessage; - return spi.readReceipt(sock, ackTimeout0); + return spi.readReceipt(sock, timeoutCtrl.nextTimeoutChunk(ackTimeout0)); } catch (ClassCastException e) { // This issue is rarely reproducible on AmazonEC2, but never @@ -979,6 +977,9 @@ class ServerImpl extends TcpDiscoveryImpl { errs.add(e); + if (timeoutCtrl.checkFailureDetectionThresholdReached(e)) + break; + if (!openSock) { // Reconnect for the second time, if connection is not established. if (connectAttempts < 2) { @@ -990,7 +991,8 @@ class ServerImpl extends TcpDiscoveryImpl { break; // Don't retry if we can not establish connection. } - if (e instanceof SocketTimeoutException || X.hasCause(e, SocketTimeoutException.class)) { + if (!spi.failureDetectionThresholdEnabled() && (e instanceof SocketTimeoutException || + X.hasCause(e, SocketTimeoutException.class))) { ackTimeout0 *= 2; if (!checkAckTimeout(ackTimeout0)) @@ -2029,7 +2031,7 @@ class ServerImpl extends TcpDiscoveryImpl { List<InetSocketAddress> locNodeAddrs = U.arrayList(locNode.socketAddresses()); addr: for (InetSocketAddress addr : spi.getNodeAddresses(next, sameHost)) { - long ackTimeout0 = spi.ackTimeout; + long ackTimeout0 = spi.getAckTimeout(); if (locNodeAddrs.contains(addr)){ if (log.isDebugEnabled()) @@ -2039,8 +2041,11 @@ class ServerImpl extends TcpDiscoveryImpl { continue; } - for (int i = 0; i < spi.reconCnt; i++) { + while (true) { if (sock == null) { + IgniteSpiOperationTimeoutController timeoutCrt = + new IgniteSpiOperationTimeoutController(spi); + nextNodeExists = false; boolean success = false; @@ -2051,14 +2056,16 @@ class ServerImpl extends TcpDiscoveryImpl { try { long tstamp = U.currentTimeMillis(); - sock = spi.openSocket(addr); + sock = spi.openSocket(addr, timeoutCrt); openSock = true; // Handshake. - writeToSocket(sock, new TcpDiscoveryHandshakeRequest(locNodeId)); + writeToSocket(sock, new TcpDiscoveryHandshakeRequest(locNodeId), + timeoutCrt.nextTimeoutChunk(spi.getNetworkTimeout())); - TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, ackTimeout0); + TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, + timeoutCrt.nextTimeoutChunk(ackTimeout0)); if (locNodeId.equals(res.creatorNodeId())) { if (log.isDebugEnabled()) @@ -2142,8 +2149,10 @@ class ServerImpl extends TcpDiscoveryImpl { if (!openSock) break; // Don't retry if we can not establish connection. - if (e instanceof SocketTimeoutException || - X.hasCause(e, SocketTimeoutException.class)) { + if (timeoutCrt.checkFailureDetectionThresholdReached(e)) + break; + else if (!spi.failureDetectionThresholdEnabled() && (e instanceof + SocketTimeoutException || X.hasCause(e, SocketTimeoutException.class))) { ackTimeout0 *= 2; if (!checkAckTimeout(ackTimeout0)) @@ -2173,6 +2182,8 @@ class ServerImpl extends TcpDiscoveryImpl { assert !forceSndPending || msg instanceof TcpDiscoveryNodeLeftMessage; + IgniteSpiOperationTimeoutController timeoutCtrl; + if (failure || forceSndPending) { if (log.isDebugEnabled()) log.debug("Pending messages will be sent [failure=" + failure + @@ -2185,6 +2196,8 @@ class ServerImpl extends TcpDiscoveryImpl { boolean skip = pendingMsgs.discardId != null; for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs.msgs) { + timeoutCtrl = new IgniteSpiOperationTimeoutController(spi); + if (skip) { if (pendingMsg.id().equals(pendingMsgs.discardId)) skip = false; @@ -2198,7 +2211,8 @@ class ServerImpl extends TcpDiscoveryImpl { pendingMsgs.discardId); try { - writeToSocket(sock, pendingMsg); + writeToSocket(sock, pendingMsg, timeoutCtrl.nextTimeoutChunk( + spi.getSocketTimeout())); } finally { clearNodeAddedMessage(pendingMsg); @@ -2206,7 +2220,7 @@ class ServerImpl extends TcpDiscoveryImpl { spi.stats.onMessageSent(pendingMsg, U.currentTimeMillis() - tstamp); - int res = spi.readReceipt(sock, ackTimeout0); + int res = spi.readReceipt(sock, timeoutCtrl.nextTimeoutChunk(ackTimeout0)); if (log.isDebugEnabled()) log.debug("Pending message has been sent to next node [msg=" + msg.id() + @@ -2222,6 +2236,7 @@ class ServerImpl extends TcpDiscoveryImpl { prepareNodeAddedMessage(msg, next.id(), pendingMsgs.msgs, pendingMsgs.discardId); + timeoutCtrl try { long tstamp = U.currentTimeMillis(); @@ -4903,14 +4918,15 @@ class ServerImpl extends TcpDiscoveryImpl { /** * @param sock Socket. * @param msg Message. + * @param timeout Socket timeout. * @throws IOException If IO failed. * @throws IgniteCheckedException If marshalling failed. */ - protected final void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg) + protected final void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException, IgniteCheckedException { bout.reset(); - spi.writeToSocket(sock, msg, bout); + spi.writeToSocket(sock, msg, bout, timeout); } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b6005a5/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java index ace917f..c4278ef 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java @@ -266,10 +266,10 @@ abstract class TcpDiscoveryImpl { * maximum acknowledgement timeout, {@code false} otherwise. */ protected boolean checkAckTimeout(long ackTimeout) { - if (ackTimeout > spi.maxAckTimeout) { + if (ackTimeout > spi.getMaxAckTimeout()) { LT.warn(log, null, "Acknowledgement timeout is greater than maximum acknowledgement timeout " + "(consider increasing 'maxAckTimeout' configuration property) " + - "[ackTimeout=" + ackTimeout + ", maxAckTimeout=" + spi.maxAckTimeout + ']'); + "[ackTimeout=" + ackTimeout + ", maxAckTimeout=" + spi.getMaxAckTimeout() + ']'); return false; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b6005a5/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index e5d5cd6..126bf03 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -676,7 +676,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T /** * Sets IP finder for IP addresses sharing and storing. * <p> - * If not provided {@link org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder} will be used by default. + * If not provided {@link org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder} will + * be used by default. * * @param ipFinder IP finder. */ @@ -1103,12 +1104,12 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T /** * @param sockAddr Remote address. - * @param timeout Socket opening timeout. + * @param timeoutCtrl Timeout controller. * @return Opened socket. * @throws IOException If failed. */ - protected Socket openSocket(InetSocketAddress sockAddr, long timeout) throws IOException, - NetOperationTimeoutException { + protected Socket openSocket(InetSocketAddress sockAddr, IgniteSpiOperationTimeoutController timeoutCtrl) + throws IOException, IgniteSpiOperationTimeoutException { assert sockAddr != null; InetSocketAddress resolved = sockAddr.isUnresolved() ? @@ -1126,11 +1127,9 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T long startTs = U.currentTimeMillis(); - sock.connect(resolved, (int)timeout); + sock.connect(resolved, (int)timeoutCtrl.nextTimeoutChunk(sockTimeout)); - timeout = nextNetOperationTimeout(timeout, startTs, sockTimeout); - - writeToSocket(sock, U.IGNITE_HEADER, timeout); + writeToSocket(sock, U.IGNITE_HEADER, timeoutCtrl.nextTimeoutChunk(sockTimeout)); return sock; } @@ -1148,7 +1147,6 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T assert sock != null; assert data != null; - //SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + sockTimeout); SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + timeout); addTimeoutObject(obj);