http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cff25e91/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index b7d6e3f..6130bd7 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -65,12 +65,20 @@ import java.util.concurrent.atomic.*; * and then this info goes to coordinator. When coordinator processes join request * and issues node added messages and all other nodes then receive info about new node. * <h1 class="header">Failure Detection</h1> - * Configuration defaults (see Configuration section below for details) - * are chosen to make possible for discovery SPI work reliably on - * most of hardware and virtual deployments, but this has made failure detection time worse. + * Configuration defaults (see Configuration section below and + * {@link IgniteConfiguration#getFailureDetectionTimeout()}) for details) are chosen to make possible for discovery + * SPI work reliably on most of hardware and virtual deployments, but this has made failure detection time worse. * <p> - * For stable low-latency networks the following more aggressive settings are recommended - * (which allows failure detection time ~200ms): + * If it's needed to tune failure detection then it's highly recommended to do this using + * {@link IgniteConfiguration#setFailureDetectionTimeout(long)}. This failure timeout automatically controls the + * following parameters: {@link #getSocketTimeout()}, {@link #getAckTimeout()}, {@link #getMaxAckTimeout()}, + * {@link #getReconnectCount()}. If any of those parameters is set explicitly, then the failure timeout setting will be + * ignored. + * <p> + * If it's required to perform advanced settings of failure detection and + * {@link IgniteConfiguration#getFailureDetectionTimeout()} is unsuitable then various {@code TcpDiscoverySpi} + * configuration parameters may be used. As an example, for stable low-latency networks the following more aggressive + * settings are recommended (which allows failure detection time ~200ms): * <ul> * <li>Heartbeat frequency (see {@link #setHeartbeatFrequency(long)}) - 100ms</li> * <li>Socket timeout (see {@link #setSocketTimeout(long)}) - 200ms</li> @@ -157,6 +165,15 @@ import java.util.concurrent.atomic.*; @DiscoverySpiOrderSupport(true) @DiscoverySpiHistorySupport(true) public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, TcpDiscoverySpiMBean { + /** Failure detection timeout feature major version. */ + final static byte FAILURE_DETECTION_MAJOR_VER = 1; + + /** Failure detection timeout feature minor version. */ + final static byte FAILURE_DETECTION_MINOR_VER = 4; + + /** Failure detection timeout feature maintainance version. */ + final static byte FAILURE_DETECTION_MAINT_VER = 1; + /** Node attribute that is mapped to node's external addresses (value is <tt>disc.tcp.ext-addrs</tt>). */ public static final String ATTR_EXT_ADDRS = "disc.tcp.ext-addrs"; @@ -221,10 +238,10 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T protected TcpDiscoveryIpFinder ipFinder; /** Socket operations timeout. */ - protected long sockTimeout; // Must be initialized in the constructor of child class. + private long sockTimeout; // Must be initialized in the constructor of child class. /** Message acknowledgement timeout. */ - protected long ackTimeout; // Must be initialized in the constructor of child class. + private long ackTimeout; // Must be initialized in the constructor of child class. /** Network timeout. */ protected long netTimeout = DFLT_NETWORK_TIMEOUT; @@ -286,14 +303,14 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T /** Reconnect attempts count. */ @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"}) - protected int reconCnt = DFLT_RECONNECT_CNT; + private int reconCnt = DFLT_RECONNECT_CNT; /** Statistics print frequency. */ @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized", "RedundantFieldInitialization"}) protected long statsPrintFreq = DFLT_STATS_PRINT_FREQ; /** Maximum message acknowledgement timeout. */ - protected long maxAckTimeout = DFLT_MAX_ACK_TIMEOUT; + private long maxAckTimeout = DFLT_MAX_ACK_TIMEOUT; /** Max heartbeats count node can miss without initiating status check. */ protected int maxMissedHbs = DFLT_MAX_MISSED_HEARTBEATS; @@ -521,6 +538,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T * on every retry. * <p> * If not specified, default is {@link #DFLT_RECONNECT_CNT}. + * <p> + * When this property is explicitly set {@link IgniteConfiguration#getFailureDetectionTimeout()} is ignored. * * @param reconCnt Number of retries during message sending. * @see #setAckTimeout(long) @@ -529,6 +548,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T public TcpDiscoverySpi setReconnectCount(int reconCnt) { this.reconCnt = reconCnt; + failureDetectionTimeoutEnabled(false); + return this; } @@ -548,6 +569,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T * If not specified, default is {@link #DFLT_MAX_ACK_TIMEOUT}. * <p> * Affected server nodes only. + * <p> + * When this property is explicitly set {@link IgniteConfiguration#getFailureDetectionTimeout()} is ignored. * * @param maxAckTimeout Maximum acknowledgement timeout. */ @@ -555,6 +578,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T public TcpDiscoverySpi setMaxAckTimeout(long maxAckTimeout) { this.maxAckTimeout = maxAckTimeout; + failureDetectionTimeoutEnabled(false); + return this; } @@ -701,7 +726,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T /** * Sets IP finder for IP addresses sharing and storing. * <p> - * If not provided {@link org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder} will be used by default. + * If not provided {@link org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder} will + * be used by default. * * @param ipFinder IP finder. */ @@ -720,6 +746,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T * significantly greater than the default (e.g. to {@code 30000}). * <p> * If not specified, default is {@link #DFLT_SOCK_TIMEOUT} or {@link #DFLT_SOCK_TIMEOUT_CLIENT}. + * <p> + * When this property is explicitly set {@link IgniteConfiguration#getFailureDetectionTimeout()} is ignored. * * @param sockTimeout Socket connection timeout. */ @@ -727,6 +755,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T public TcpDiscoverySpi setSocketTimeout(long sockTimeout) { this.sockTimeout = sockTimeout; + failureDetectionTimeoutEnabled(false); + return this; } @@ -737,6 +767,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T * and SPI tries to repeat message sending. * <p> * If not specified, default is {@link #DFLT_ACK_TIMEOUT} or {@link #DFLT_ACK_TIMEOUT_CLIENT}. + * <p> + * When this property is explicitly set {@link IgniteConfiguration#getFailureDetectionTimeout()} is ignored. * * @param ackTimeout Acknowledgement timeout. */ @@ -744,6 +776,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T public TcpDiscoverySpi setAckTimeout(long ackTimeout) { this.ackTimeout = ackTimeout; + failureDetectionTimeoutEnabled(false); + return this; } @@ -1123,10 +1157,12 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T /** * @param sockAddr Remote address. + * @param timeoutHelper Timeout helper. * @return Opened socket. * @throws IOException If failed. */ - protected Socket openSocket(InetSocketAddress sockAddr) throws IOException { + protected Socket openSocket(InetSocketAddress sockAddr, IgniteSpiOperationTimeoutHelper timeoutHelper) + throws IOException, IgniteSpiOperationTimeoutException { assert sockAddr != null; InetSocketAddress resolved = sockAddr.isUnresolved() ? @@ -1142,9 +1178,9 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T sock.setTcpNoDelay(true); - sock.connect(resolved, (int)sockTimeout); + sock.connect(resolved, (int)timeoutHelper.nextTimeoutChunk(sockTimeout)); - writeToSocket(sock, U.IGNITE_HEADER); + writeToSocket(sock, U.IGNITE_HEADER, timeoutHelper.nextTimeoutChunk(sockTimeout)); return sock; } @@ -1154,14 +1190,15 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T * * @param sock Socket. * @param data Raw data to write. + * @param timeout Socket write timeout. * @throws IOException If IO failed or write timed out. */ @SuppressWarnings("ThrowFromFinallyBlock") - protected void writeToSocket(Socket sock, byte[] data) throws IOException { + private void writeToSocket(Socket sock, byte[] data, long timeout) throws IOException { assert sock != null; assert data != null; - SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + sockTimeout); + SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + timeout); addTimeoutObject(obj); @@ -1197,11 +1234,13 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T * * @param sock Socket. * @param msg Message. + * @param timeout Socket write timeout. * @throws IOException If IO failed or write timed out. * @throws IgniteCheckedException If marshalling failed. */ - protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg) throws IOException, IgniteCheckedException { - writeToSocket(sock, msg, new GridByteArrayOutputStream(8 * 1024)); // 8K. + protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, long timeout) + throws IOException, IgniteCheckedException { + writeToSocket(sock, msg, new GridByteArrayOutputStream(8 * 1024), timeout); // 8K. } /** @@ -1214,8 +1253,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T * @throws IgniteCheckedException If marshalling failed. */ @SuppressWarnings("ThrowFromFinallyBlock") - protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, GridByteArrayOutputStream bout) - throws IOException, IgniteCheckedException { + protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, GridByteArrayOutputStream bout, + long timeout) throws IOException, IgniteCheckedException { assert sock != null; assert msg != null; assert bout != null; @@ -1223,7 +1262,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T // Marshall message first to perform only write after. marsh.marshal(msg, bout); - SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + sockTimeout); + SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + timeout); addTimeoutObject(obj); @@ -1260,13 +1299,15 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T * @param msg Received message. * @param sock Socket. * @param res Integer response. + * @param timeout Socket timeout. * @throws IOException If IO failed or write timed out. */ @SuppressWarnings("ThrowFromFinallyBlock") - protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock, int res) throws IOException { + protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock, int res, long timeout) + throws IOException { assert sock != null; - SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + sockTimeout); + SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + timeout); addTimeoutObject(obj); @@ -1307,7 +1348,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T * @throws IOException If IO failed or read timed out. * @throws IgniteCheckedException If unmarshalling failed. */ - protected <T> T readMessage(Socket sock, @Nullable InputStream in, long timeout) throws IOException, IgniteCheckedException { + protected <T> T readMessage(Socket sock, @Nullable InputStream in, long timeout) throws IOException, + IgniteCheckedException { assert sock != null; int oldTimeout = sock.getSoTimeout(); @@ -1315,7 +1357,11 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T try { sock.setSoTimeout((int)timeout); - return marsh.unmarshal(in == null ? sock.getInputStream() : in, U.gridClassLoader()); + T res = marsh.unmarshal(in == null ? sock.getInputStream() : in, U.gridClassLoader()); + + impl.onDataReceived(); + + return res; } catch (IOException | IgniteCheckedException e) { if (X.hasCause(e, SocketTimeoutException.class)) @@ -1356,6 +1402,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T if (res == -1) throw new EOFException(); + impl.onDataReceived(); + return res; } catch (SocketTimeoutException e) { @@ -1570,6 +1618,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T /** {@inheritDoc} */ @Override public void spiStart(@Nullable String gridName) throws IgniteSpiException { + initFailureDetectionTimeout(); + if (!forceSrvMode && (Boolean.TRUE.equals(ignite.configuration().isClientMode()))) { if (ackTimeout == 0) ackTimeout = DFLT_ACK_TIMEOUT_CLIENT; @@ -1591,18 +1641,21 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T impl = new ServerImpl(this); } + if (!failureDetectionTimeoutEnabled()) { + assertParameter(sockTimeout > 0, "sockTimeout > 0"); + assertParameter(ackTimeout > 0, "ackTimeout > 0"); + assertParameter(maxAckTimeout > ackTimeout, "maxAckTimeout > ackTimeout"); + assertParameter(reconCnt > 0, "reconnectCnt > 0"); + } + + assertParameter(netTimeout > 0, "networkTimeout > 0"); assertParameter(ipFinder != null, "ipFinder != null"); assertParameter(hbFreq > 0, "heartbeatFreq > 0"); - assertParameter(netTimeout > 0, "networkTimeout > 0"); - assertParameter(sockTimeout > 0, "sockTimeout > 0"); - assertParameter(ackTimeout > 0, "ackTimeout > 0"); assertParameter(ipFinderCleanFreq > 0, "ipFinderCleanFreq > 0"); assertParameter(locPort > 1023, "localPort > 1023"); assertParameter(locPortRange >= 0, "localPortRange >= 0"); assertParameter(locPort + locPortRange <= 0xffff, "locPort + locPortRange <= 0xffff"); - assertParameter(maxAckTimeout > ackTimeout, "maxAckTimeout > ackTimeout"); - assertParameter(reconCnt > 0, "reconnectCnt > 0"); assertParameter(maxMissedHbs > 0, "maxMissedHeartbeats > 0"); assertParameter(maxMissedClientHbs > 0, "maxMissedClientHeartbeats > 0"); assertParameter(threadPri > 0, "threadPri > 0"); @@ -1620,11 +1673,20 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T log.debug(configInfo("localPort", locPort)); log.debug(configInfo("localPortRange", locPortRange)); log.debug(configInfo("threadPri", threadPri)); - log.debug(configInfo("networkTimeout", netTimeout)); - log.debug(configInfo("sockTimeout", sockTimeout)); - log.debug(configInfo("ackTimeout", ackTimeout)); - log.debug(configInfo("maxAckTimeout", maxAckTimeout)); - log.debug(configInfo("reconnectCount", reconCnt)); + + if (!failureDetectionTimeoutEnabled()) { + log.debug("Failure detection timeout is ignored because at least one of the parameters from this list" + + " has been set explicitly: 'sockTimeout', 'ackTimeout', 'maxAckTimeout', 'reconnectCount'."); + + log.debug(configInfo("networkTimeout", netTimeout)); + log.debug(configInfo("sockTimeout", sockTimeout)); + log.debug(configInfo("ackTimeout", ackTimeout)); + log.debug(configInfo("maxAckTimeout", maxAckTimeout)); + log.debug(configInfo("reconnectCount", reconCnt)); + } + else + log.debug(configInfo("failureDetectionTimeout", failureDetectionTimeout())); + log.debug(configInfo("ipFinder", ipFinder)); log.debug(configInfo("ipFinderCleanFreq", ipFinderCleanFreq)); log.debug(configInfo("heartbeatFreq", hbFreq)); @@ -1837,7 +1899,10 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T U.closeQuiet(sock); LT.warn(log, null, "Socket write has timed out (consider increasing " + - "'sockTimeout' configuration property) [sockTimeout=" + sockTimeout + ']'); + (failureDetectionTimeoutEnabled() ? + "'IgniteConfiguration.failureDetectionTimeout' configuration property) [" + + "failureDetectionTimeout=" + failureDetectionTimeout() + ']' : + "'sockTimeout' configuration property) [sockTimeout=" + sockTimeout + ']')); stats.onSocketTimeout(); }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cff25e91/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java index 142dbea..44e9006 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java @@ -89,6 +89,9 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste @GridToStringExclude private volatile long lastUpdateTime = U.currentTimeMillis(); + /** The most recent time when a data chunk was received from a node. */ + private volatile long lastDataReceivedTime = U.currentTimeMillis(); + /** Metrics provider (transient). */ @GridToStringExclude private DiscoveryMetricsProvider metricsProvider; @@ -390,6 +393,24 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste } /** + * Gets the last time a node received a data chunk from a remote node. + * + * @return Time in milliseconds. + */ + public long lastDataReceivedTime() { + return lastDataReceivedTime; + } + + /** + * Sets the last time a node receive a data chunk from a remote node in a topology. + * + * @param lastDataReceivedTime Time in milliseconds. + */ + public void lastDataReceivedTime(long lastDataReceivedTime) { + this.lastDataReceivedTime = lastDataReceivedTime; + } + + /** * Gets visible flag. * * @return {@code true} if node is in visible state. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cff25e91/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryConnectionCheckMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryConnectionCheckMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryConnectionCheckMessage.java new file mode 100644 index 0000000..c7e99c8 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryConnectionCheckMessage.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp.messages; + +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.spi.discovery.tcp.internal.*; + +import java.io.*; + +/** + * Message used to check whether a node is still connected to the topology. + * The difference from {@link TcpDiscoveryStatusCheckMessage} is that this message is sent to the next node + * which directly replies to the sender without message re-translation to the coordinator. + */ +public class TcpDiscoveryConnectionCheckMessage extends TcpDiscoveryAbstractMessage implements Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** + * Default no-arg constructor for {@link Externalizable} interface. + */ + public TcpDiscoveryConnectionCheckMessage() { + // No-op. + } + + /** + * Constructor. + * + * @param creatorNode Node created this message. + */ + public TcpDiscoveryConnectionCheckMessage(TcpDiscoveryNode creatorNode) { + super(creatorNode.id()); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + // This method has been left empty intentionally to keep message size at min. + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + // This method has been left empty intentionally to keep message size at min. + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TcpDiscoveryConnectionCheckMessage.class, this, "super", super.toString()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cff25e91/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java index fbaea11..7247d54 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java @@ -281,7 +281,7 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra volatile CountDownLatch writeLatch; /** {@inheritDoc} */ - @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg) + @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException, IgniteCheckedException { if (msg instanceof TcpDiscoveryJoinRequestMessage) { CountDownLatch writeLatch0 = writeLatch; @@ -293,7 +293,7 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra } } - super.writeToSocket(sock, msg); + super.writeToSocket(sock, msg, timeout); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cff25e91/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java index eee38a5..538ead5 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java @@ -79,7 +79,7 @@ abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunica for (CommunicationSpi spi : spis.values()) { ConcurrentMap<UUID, GridCommunicationClient> clients = U.field(spi, "clients"); - assertEquals(2, clients.size()); + assertEquals(getSpiCount() - 1, clients.size()); clients.put(UUID.randomUUID(), F.first(clients.values())); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cff25e91/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java index 1a4ba22..b4090d0 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java @@ -166,7 +166,8 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS @Override public boolean apply() { return recoveryDesc.messagesFutures().isEmpty(); } - }, 10_000); + }, spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() + 7000 : + 10_000); assertEquals("Unexpected messages: " + recoveryDesc.messagesFutures(), 0, recoveryDesc.messagesFutures().size()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cff25e91/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryFailureDetectionSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryFailureDetectionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryFailureDetectionSelfTest.java new file mode 100644 index 0000000..a6bfe00 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryFailureDetectionSelfTest.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.communication.tcp; + +import org.apache.ignite.configuration.*; + +/** + * + */ +public class GridTcpCommunicationSpiRecoveryFailureDetectionSelfTest extends GridTcpCommunicationSpiRecoverySelfTest { + /** {@inheritDoc} */ + @Override protected TcpCommunicationSpi getSpi(int idx) { + TcpCommunicationSpi spi = new TcpCommunicationSpi(); + + spi.setSharedMemoryPort(-1); + spi.setLocalPort(port++); + spi.setIdleConnectionTimeout(10_000); + spi.setAckSendThreshold(5); + spi.setSocketSendBuffer(512); + spi.setSocketReceiveBuffer(512); + + return spi; + } + + /** {@inheritDoc} */ + @Override protected long awaitForSocketWriteTimeout() { + return IgniteConfiguration.DFLT_FAILURE_DETECTION_TIMEOUT + 5_000; + } + + /** + * @throws Exception if failed. + */ + public void testFailureDetectionEnabled() throws Exception { + for (TcpCommunicationSpi spi: spis) { + assertTrue(spi.failureDetectionTimeoutEnabled()); + assertTrue(spi.failureDetectionTimeout() == IgniteConfiguration.DFLT_FAILURE_DETECTION_TIMEOUT); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cff25e91/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java index 5d3afd9..67d42d3 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java @@ -60,7 +60,7 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi> private static final int ITERS = 10; /** */ - private static int port = 30_000; + protected static int port = 30_000; /** * @@ -163,6 +163,15 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi> } /** + * Time to wait for socket write timeout. + * + * @return Timeout. + */ + protected long awaitForSocketWriteTimeout() { + return 5000; + } + + /** * @throws Exception If failed. */ public void testBlockListener() throws Exception { @@ -245,7 +254,7 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi> @Override public boolean apply() { return lsnr0.rcvCnt.get() >= expMsgs && lsnr1.rcvCnt.get() >= expMsgs; } - }, 5000); + }, awaitForSocketWriteTimeout()); assertEquals(expMsgs, lsnr0.rcvCnt.get()); assertEquals(expMsgs, lsnr1.rcvCnt.get()); @@ -301,7 +310,7 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi> @Override public boolean apply() { return ses0.closeTime() != 0; } - }, 5000); + }, awaitForSocketWriteTimeout()); assertTrue("Failed to wait for session close", ses0.closeTime() != 0); @@ -411,7 +420,7 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi> @Override public boolean apply() { return ses0.closeTime() != 0; } - }, 5000); + }, awaitForSocketWriteTimeout()); assertTrue("Failed to wait for session close", ses0.closeTime() != 0); @@ -423,7 +432,7 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi> public boolean apply() { return ses1.closeTime() != 0; } - }, 5000); + }, awaitForSocketWriteTimeout()); assertTrue("Failed to wait for session close", ses1.closeTime() != 0); @@ -528,7 +537,7 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi> @Override public boolean apply() { return ses0.closeTime() != 0; } - }, 5000); + }, awaitForSocketWriteTimeout()); assertTrue("Failed to wait for session close", ses0.closeTime() != 0); @@ -592,7 +601,7 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi> return !sessions.isEmpty(); } - }, 5000); + }, awaitForSocketWriteTimeout()); Collection<? extends GridNioSession> sessions = GridTestUtils.getFieldValue(srv, "sessions"); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cff25e91/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiTcpFailureDetectionSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiTcpFailureDetectionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiTcpFailureDetectionSelfTest.java new file mode 100644 index 0000000..56873d1 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiTcpFailureDetectionSelfTest.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.communication.tcp; + +import org.apache.ignite.configuration.*; +import org.apache.ignite.spi.communication.*; + +/** + * + */ +public class GridTcpCommunicationSpiTcpFailureDetectionSelfTest extends GridTcpCommunicationSpiTcpSelfTest { + /** */ + private final static int SPI_COUNT = 4; + + private TcpCommunicationSpi spis[] = new TcpCommunicationSpi[SPI_COUNT]; + + /** {@inheritDoc} */ + @Override protected int getSpiCount() { + return SPI_COUNT; + } + + /** {@inheritDoc} */ + @Override protected CommunicationSpi getSpi(int idx) { + TcpCommunicationSpi spi = (TcpCommunicationSpi)super.getSpi(idx); + + switch (idx) { + case 0: + // Ignore + break; + case 1: + spi.setConnectTimeout(4000); + break; + case 2: + spi.setMaxConnectTimeout(TcpCommunicationSpi.DFLT_MAX_CONN_TIMEOUT); + break; + case 3: + spi.setReconnectCount(2); + break; + default: + assert false; + } + + spis[idx] = spi; + + return spi; + } + + /** + * @throws Exception if failed. + */ + public void testFailureDetectionEnabled() throws Exception { + assertTrue(spis[0].failureDetectionTimeoutEnabled()); + assertTrue(spis[0].failureDetectionTimeout() == IgniteConfiguration.DFLT_FAILURE_DETECTION_TIMEOUT); + + for (int i = 1; i < SPI_COUNT; i++) { + assertFalse(spis[i].failureDetectionTimeoutEnabled()); + assertEquals(0, spis[i].failureDetectionTimeout()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cff25e91/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java index 61bb944..892d87d 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java @@ -43,12 +43,18 @@ import static org.apache.ignite.lang.IgniteProductVersion.*; @SuppressWarnings({"JUnitAbstractTestClassNamingConvention"}) public abstract class AbstractDiscoverySelfTest<T extends IgniteSpi> extends GridSpiAbstractTest<T> { /** */ - private static final List<DiscoverySpi> spis = new ArrayList<>(); + private static final String HTTP_ADAPTOR_MBEAN_NAME = "mbeanAdaptor:protocol=HTTP"; + + /** */ + protected static final List<DiscoverySpi> spis = new ArrayList<>(); /** */ private static final Collection<IgniteTestResources> spiRsrcs = new ArrayList<>(); /** */ + private static final List<HttpAdaptor> httpAdaptors = new ArrayList<>(); + + /** */ private static long spiStartTime; /** */ @@ -424,10 +430,12 @@ public abstract class AbstractDiscoverySelfTest<T extends IgniteSpi> extends Gri adaptor.setPort(Integer.valueOf(GridTestProperties.getProperty("discovery.mbeanserver.selftest.baseport")) + idx); - srv.registerMBean(adaptor, new ObjectName("mbeanAdaptor:protocol=HTTP")); + srv.registerMBean(adaptor, new ObjectName(HTTP_ADAPTOR_MBEAN_NAME)); adaptor.start(); + httpAdaptors.add(adaptor); + return srv; } @@ -442,12 +450,21 @@ public abstract class AbstractDiscoverySelfTest<T extends IgniteSpi> extends Gri spi.spiStop(); } - for (IgniteTestResources rscrs : spiRsrcs) + for (IgniteTestResources rscrs : spiRsrcs) { + MBeanServer mBeanServer = rscrs.getMBeanServer(); + + mBeanServer.unregisterMBean(new ObjectName(HTTP_ADAPTOR_MBEAN_NAME)); + rscrs.stopThreads(); + } + + for (HttpAdaptor adaptor : httpAdaptors) + adaptor.stop(); // Clear. spis.clear(); spiRsrcs.clear(); + httpAdaptors.clear(); spiStartTime = 0; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cff25e91/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java new file mode 100644 index 0000000..3cf44f2 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp; + +import org.apache.ignite.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.spi.discovery.tcp.messages.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.net.*; + +/** + * Client-based discovery SPI test with failure detection timeout enabled. + */ +public class TcpClientDiscoverySpiFailureTimeoutSelfTest extends TcpClientDiscoverySpiSelfTest { + /** */ + private final static int FAILURE_AWAIT_TIME = 7_000; + + /** */ + private final static long FAILURE_THRESHOLD = 10_000; + + /** */ + private static long failureThreshold = FAILURE_THRESHOLD; + + /** */ + private static boolean useTestSpi; + + /** {@inheritDoc} */ + @Override protected boolean useFailureDetectionTimeout() { + return true; + } + + /** {@inheritDoc} */ + @Override protected long failureDetectionTimeout() { + return failureThreshold; + } + + /** {@inheritDoc} */ + @Override protected long awaitTime() { + return failureDetectionTimeout() + FAILURE_AWAIT_TIME; + } + + /** {@inheritDoc} */ + @Override protected TcpDiscoverySpi getDiscoverySpi() { + return useTestSpi ? new TestTcpDiscoverySpi() : super.getDiscoverySpi(); + } + + /** + * @throws Exception in case of error. + */ + public void testFailureDetectionTimeoutEnabled() throws Exception { + startServerNodes(1); + startClientNodes(1); + + checkNodes(1, 1); + + assertTrue(((TcpDiscoverySpi)(G.ignite("server-0").configuration().getDiscoverySpi())). + failureDetectionTimeoutEnabled()); + assertEquals(failureDetectionTimeout(), + ((TcpDiscoverySpi)(G.ignite("server-0").configuration().getDiscoverySpi())).failureDetectionTimeout()); + + assertTrue(((TcpDiscoverySpi)(G.ignite("client-0").configuration().getDiscoverySpi())). + failureDetectionTimeoutEnabled()); + assertEquals(failureDetectionTimeout(), + ((TcpDiscoverySpi)(G.ignite("client-0").configuration().getDiscoverySpi())).failureDetectionTimeout()); + } + + /** + * @throws Exception in case of error. + */ + public void testFailureTimeoutWorkabilityAvgTimeout() throws Exception { + failureThreshold = 3000; + + try { + checkFailureThresholdWorkability(); + } + finally { + failureThreshold = FAILURE_THRESHOLD; + } + } + + /** + * @throws Exception in case of error. + */ + public void testFailureTimeoutWorkabilitySmallTimeout() throws Exception { + failureThreshold = 500; + + try { + checkFailureThresholdWorkability(); + } + finally { + failureThreshold = FAILURE_THRESHOLD; + } + } + + /** + * @throws Exception in case of error. + */ + private void checkFailureThresholdWorkability() throws Exception { + useTestSpi = true; + + TestTcpDiscoverySpi firstSpi = null; + TestTcpDiscoverySpi secondSpi = null; + + try { + startServerNodes(2); + + checkNodes(2, 0); + + firstSpi = (TestTcpDiscoverySpi)(G.ignite("server-0").configuration().getDiscoverySpi()); + secondSpi = (TestTcpDiscoverySpi)(G.ignite("server-1").configuration().getDiscoverySpi()); + + assert firstSpi.err == null; + + secondSpi.readDelay = failureDetectionTimeout() + 5000; + + assertFalse(firstSpi.pingNode(secondSpi.getLocalNodeId())); + + Thread.sleep(failureDetectionTimeout()); + + assertTrue(firstSpi.err != null && X.hasCause(firstSpi.err, SocketTimeoutException.class)); + + firstSpi.reset(); + secondSpi.reset(); + + assertTrue(firstSpi.pingNode(secondSpi.getLocalNodeId())); + + assertTrue(firstSpi.err == null); + } + finally { + useTestSpi = false; + + if (firstSpi != null) + firstSpi.reset(); + + if (secondSpi != null) + secondSpi.reset(); + } + } + + /** + * + */ + private static class TestTcpDiscoverySpi extends TcpDiscoverySpi { + /** */ + private long readDelay; + + /** */ + private Exception err; + + /** {@inheritDoc} */ + @Override protected <T> T readMessage(Socket sock, @Nullable InputStream in, long timeout) + throws IOException, IgniteCheckedException { + + if (readDelay < failureDetectionTimeout()) { + try { + return super.readMessage(sock, in, timeout); + } + catch (Exception e) { + err = e; + + throw e; + } + } + else { + T msg = super.readMessage(sock, in, timeout); + + if (msg instanceof TcpDiscoveryPingRequest) { + try { + Thread.sleep(2000); + } catch (InterruptedException e) { + // Ignore + } + throw new SocketTimeoutException("Forced timeout"); + } + + return msg; + } + } + + /** + * Resets testing state. + */ + private void reset() { + readDelay = 0; + err = null; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cff25e91/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java index 63db0c1..69a5f13 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java @@ -118,7 +118,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); - TestTcpDiscoverySpi disco = new TestTcpDiscoverySpi(); + TcpDiscoverySpi disco = getDiscoverySpi(); disco.setMaxMissedClientHeartbeats(maxMissedClientHbs); @@ -154,9 +154,19 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { else throw new IllegalArgumentException(); - if (longSockTimeouts) { - disco.setAckTimeout(2000); - disco.setSocketTimeout(2000); + if (useFailureDetectionTimeout()) + cfg.setFailureDetectionTimeout(failureDetectionTimeout()); + else { + if (longSockTimeouts) { + disco.setAckTimeout(2000); + disco.setSocketTimeout(2000); + } + else { + disco.setAckTimeout(gridName.startsWith("client") ? TcpDiscoverySpi.DFLT_ACK_TIMEOUT_CLIENT : + TcpDiscoverySpi.DFLT_ACK_TIMEOUT); + disco.setSocketTimeout(gridName.startsWith("client") ? TcpDiscoverySpi.DFLT_SOCK_TIMEOUT_CLIENT : + TcpDiscoverySpi.DFLT_SOCK_TIMEOUT); + } } disco.setJoinTimeout(joinTimeout); @@ -164,7 +174,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { disco.setClientReconnectDisabled(reconnectDisabled); - disco.afterWrite(afterWrite); + if (disco instanceof TestTcpDiscoverySpi) + ((TestTcpDiscoverySpi)disco).afterWrite(afterWrite); cfg.setDiscoverySpi(disco); @@ -174,6 +185,14 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { return cfg; } + /** + * Returns TCP Discovery SPI instance to use in a test. + * @return TCP Discovery SPI. + */ + protected TcpDiscoverySpi getDiscoverySpi() { + return new TestTcpDiscoverySpi(); + } + /** {@inheritDoc} */ @Override protected void beforeTest() throws Exception { Collection<InetSocketAddress> addrs = IP_FINDER.getRegisteredAddresses(); @@ -205,6 +224,24 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { } /** + * Checks whether to use failure detection timeout instead of setting explicit timeouts. + * + * @return {@code true} if use. + */ + protected boolean useFailureDetectionTimeout() { + return false; + } + + /** + * Gets failure detection timeout to use. + * + * @return Failure detection timeout. + */ + protected long failureDetectionTimeout() { + return 0; + } + + /** * @throws Exception If failed. */ public void testJoinTimeout() throws Exception { @@ -390,12 +427,12 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { final CountDownLatch latch = new CountDownLatch(1); - ((TcpDiscoverySpi)srv1.configuration().getDiscoverySpi()).addIncomeConnectionListener(new IgniteInClosure<Socket>() { + ((TcpDiscoverySpi)srv1.configuration().getDiscoverySpi()).addIncomeConnectionListener(new IgniteInClosure + <Socket>() { @Override public void apply(Socket sock) { try { latch.await(); - } - catch (InterruptedException e) { + } catch (InterruptedException e) { throw new RuntimeException(e); } } @@ -414,11 +451,14 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { startServerNodes(2); startClientNodes(1); + checkNodes(2, 1); + Ignite srv0 = G.ignite("server-0"); Ignite srv1 = G.ignite("server-1"); Ignite client = G.ignite("client-0"); - ((TcpDiscoverySpi)srv0.configuration().getDiscoverySpi()).setAckTimeout(1000); + if (!useFailureDetectionTimeout()) + ((TcpDiscoverySpi)srv0.configuration().getDiscoverySpi()).setAckTimeout(1000); ((TestTcpDiscoverySpi)client.configuration().getDiscoverySpi()).pauseSocketWrite(); @@ -756,8 +796,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { @Override public void apply(TcpDiscoveryAbstractMessage msg) { try { Thread.sleep(1000000); - } - catch (InterruptedException ignored) { + } catch (InterruptedException ignored) { Thread.interrupted(); } } @@ -1405,8 +1444,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { latch.countDown(); - assertTrue(disconnectLatch.await(10_000, MILLISECONDS)); - assertTrue(reconnectLatch.await(10_000, MILLISECONDS)); + assertTrue(disconnectLatch.await(awaitTime(), MILLISECONDS)); + assertTrue(reconnectLatch.await(awaitTime(), MILLISECONDS)); clientNodeIds.add(client.cluster().localNode().id()); @@ -1474,7 +1513,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { * @param failSrv If {@code true} fails server, otherwise server does not send join message. * @throws Exception If failed. */ - private void reconnectSegmentedAfterJoinTimeout(boolean failSrv) throws Exception { + protected void reconnectSegmentedAfterJoinTimeout(boolean failSrv) throws Exception { netTimeout = 4000; joinTimeout = 5000; @@ -1542,9 +1581,9 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { clientSpi.brakeConnection(); } - assertTrue(disconnectLatch.await(10_000, MILLISECONDS)); + assertTrue(disconnectLatch.await(awaitTime(), MILLISECONDS)); - assertTrue(segmentedLatch.await(10_000, MILLISECONDS)); + assertTrue(segmentedLatch.await(awaitTime(), MILLISECONDS)); waitSegmented(client); @@ -1557,7 +1596,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { @Override public boolean apply() { return srv.cluster().nodes().size() == 1; } - }, 10_000); + }, awaitTime()); checkNodes(1, 0); } @@ -1614,7 +1653,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { srv.close(); - assertTrue(disconnectLatch.await(10_000, MILLISECONDS)); + assertTrue(disconnectLatch.await(awaitTime(), MILLISECONDS)); srvNodeIds.clear(); srvIdx.set(0); @@ -1625,7 +1664,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { startServerNodes(1); - assertTrue(reconnectLatch.await(10_000, MILLISECONDS)); + assertTrue(reconnectLatch.await(awaitTime(), MILLISECONDS)); clientNodeIds.clear(); clientNodeIds.add(client.cluster().localNode().id()); @@ -1695,7 +1734,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { clientSpi.brakeConnection(); - assertTrue(disconnectLatch.await(10_000, MILLISECONDS)); + assertTrue(disconnectLatch.await(awaitTime(), MILLISECONDS)); log.info("Fail client connection2."); @@ -1704,7 +1743,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { clientSpi.brakeConnection(); - assertTrue(reconnectLatch.await(10_000, MILLISECONDS)); + assertTrue(reconnectLatch.await(awaitTime(), MILLISECONDS)); clientNodeIds.clear(); @@ -1715,7 +1754,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { public boolean apply() { return srv.cluster().nodes().size() == 2; } - }, 10_000); + }, awaitTime()); checkNodes(1, 1); @@ -1759,7 +1798,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { * @param cnt Number of nodes. * @throws Exception In case of error. */ - private void startServerNodes(int cnt) throws Exception { + protected void startServerNodes(int cnt) throws Exception { for (int i = 0; i < cnt; i++) { Ignite g = startGrid("server-" + srvIdx.getAndIncrement()); @@ -1771,7 +1810,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { * @param cnt Number of nodes. * @throws Exception In case of error. */ - private void startClientNodes(int cnt) throws Exception { + protected void startClientNodes(int cnt) throws Exception { for (int i = 0; i < cnt; i++) { Ignite g = startGrid("client-" + clientIdx.getAndIncrement()); @@ -1888,7 +1927,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { * @param srvCnt Number of server nodes. * @param clientCnt Number of client nodes. */ - private void checkNodes(int srvCnt, int clientCnt) { + protected void checkNodes(int srvCnt, int clientCnt) { long topVer = -1; for (int i = 0; i < srvCnt; i++) { @@ -1950,8 +1989,17 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { * @param latch Latch. * @throws InterruptedException If interrupted. */ - private void await(CountDownLatch latch) throws InterruptedException { - assertTrue("Latch count: " + latch.getCount(), latch.await(10_000, MILLISECONDS)); + protected void await(CountDownLatch latch) throws InterruptedException { + assertTrue("Latch count: " + latch.getCount(), latch.await(awaitTime(), MILLISECONDS)); + } + + /** + * Time to wait for operation completion. + * + * @return Time in milliseconds. + */ + protected long awaitTime() { + return 10_000; } /** @@ -2072,7 +2120,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { /** {@inheritDoc} */ @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, - GridByteArrayOutputStream bout) throws IOException, IgniteCheckedException { + GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException { waitFor(writeLock); boolean fail = false; @@ -2097,17 +2145,18 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { sock.close(); } - super.writeToSocket(sock, msg, bout); + super.writeToSocket(sock, msg, bout, timeout); if (afterWrite != null) afterWrite.apply(msg, sock); } /** {@inheritDoc} */ - @Override protected Socket openSocket(InetSocketAddress sockAddr) throws IOException { + @Override protected Socket openSocket(InetSocketAddress sockAddr, + IgniteSpiOperationTimeoutHelper timeoutHelper) throws IOException, IgniteSpiOperationTimeoutException { waitFor(openSockLock); - return super.openSocket(sockAddr); + return super.openSocket(sockAddr, new IgniteSpiOperationTimeoutHelper(this)); } /** @@ -2137,7 +2186,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { } /** {@inheritDoc} */ - @Override protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock, int res) throws IOException { + @Override protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock, int res, long timeout) + throws IOException { if (delayJoinAckFor != null && msg instanceof TcpDiscoveryJoinRequestMessage) { TcpDiscoveryJoinRequestMessage msg0 = (TcpDiscoveryJoinRequestMessage)msg; @@ -2155,7 +2205,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { } } - super.writeToSocket(msg, sock, res); + super.writeToSocket(msg, sock, res, timeout); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cff25e91/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiConfigSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiConfigSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiConfigSelfTest.java index 3e895be..8ab2116 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiConfigSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiConfigSelfTest.java @@ -17,6 +17,7 @@ package org.apache.ignite.spi.discovery.tcp; +import org.apache.ignite.configuration.*; import org.apache.ignite.testframework.junits.spi.*; /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cff25e91/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java new file mode 100644 index 0000000..fbea187 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java @@ -0,0 +1,402 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.util.io.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.spi.*; +import org.apache.ignite.spi.discovery.*; +import org.apache.ignite.spi.discovery.tcp.internal.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.spi.discovery.tcp.messages.*; + +import java.io.*; +import java.net.*; + +/** + * + */ +public class TcpDiscoverySpiFailureTimeoutSelfTest extends AbstractDiscoverySelfTest { + /** */ + private static final int SPI_COUNT = 6; + + /** */ + private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected int getSpiCount() { + return SPI_COUNT; + } + + /** {@inheritDoc} */ + @Override protected DiscoverySpi getSpi(int idx) { + TestTcpDiscoverySpi spi = new TestTcpDiscoverySpi(); + + spi.setMetricsProvider(createMetricsProvider()); + spi.setIpFinder(ipFinder); + + switch (idx) { + case 0: + case 1: + // Ignore + break; + case 2: + spi.setAckTimeout(3000); + break; + case 3: + spi.setSocketTimeout(4000); + break; + case 4: + spi.setReconnectCount(4); + break; + case 5: + spi.setMaxAckTimeout(10000); + break; + default: + assert false; + } + + return spi; + } + + /** + * @throws Exception In case of error. + */ + public void testFailureDetectionTimeoutEnabled() throws Exception { + assertTrue(firstSpi().failureDetectionTimeoutEnabled()); + assertTrue(secondSpi().failureDetectionTimeoutEnabled()); + + assertEquals(IgniteConfiguration.DFLT_FAILURE_DETECTION_TIMEOUT.longValue(), + firstSpi().failureDetectionTimeout()); + assertEquals(IgniteConfiguration.DFLT_FAILURE_DETECTION_TIMEOUT.longValue(), + secondSpi().failureDetectionTimeout()); + } + + /** + * @throws Exception In case of error. + */ + public void testFailureDetectionTimeoutDisabled() throws Exception { + for (int i = 2; i < spis.size(); i++) { + assertFalse(((TcpDiscoverySpi)spis.get(i)).failureDetectionTimeoutEnabled()); + assertEquals(0, ((TcpDiscoverySpi)spis.get(i)).failureDetectionTimeout()); + } + } + + /** + * @throws Exception In case of error. + */ + public void testFailureDetectionOnSocketOpen() throws Exception { + try { + ClusterNode node = secondSpi().getLocalNode(); + + firstSpi().openSocketTimeout = true; + + assertFalse(firstSpi().pingNode(node.id())); + assertTrue(firstSpi().validTimeout); + assertTrue(firstSpi().err.getMessage().equals("Timeout: openSocketTimeout")); + + firstSpi().openSocketTimeout = false; + firstSpi().openSocketTimeoutWait = true; + + assertFalse(firstSpi().pingNode(node.id())); + assertTrue(firstSpi().validTimeout); + assertTrue(firstSpi().err.getMessage().equals("Timeout: openSocketTimeoutWait")); + } + finally { + firstSpi().resetState(); + } + } + + + /** + * @throws Exception In case of error. + */ + public void testFailureDetectionOnSocketWrite() throws Exception { + try { + ClusterNode node = secondSpi().getLocalNode(); + + firstSpi().writeToSocketTimeoutWait = true; + + assertFalse(firstSpi().pingNode(node.id())); + assertTrue(firstSpi().validTimeout); + + firstSpi().writeToSocketTimeoutWait = false; + + assertTrue(firstSpi().pingNode(node.id())); + assertTrue(firstSpi().validTimeout); + } + finally { + firstSpi().resetState(); + } + } + + /** + * @throws Exception In case of error. + */ + public void testConnectionCheckMessage() throws Exception { + TestTcpDiscoverySpi nextSpi = null; + + try { + assert firstSpi().connCheckStatusMsgCntSent == 0; + + TcpDiscoveryNode nextNode = ((ServerImpl)(firstSpi().impl)).ring().nextNode(); + + assertNotNull(nextNode); + + nextSpi = null; + + for (int i = 1; i < spis.size(); i++) + if (spis.get(i).getLocalNode().id().equals(nextNode.id())) { + nextSpi = (TestTcpDiscoverySpi)spis.get(i); + break; + } + + assertNotNull(nextSpi); + + assert nextSpi.connCheckStatusMsgCntReceived == 0; + + firstSpi().countConnCheckMsg = true; + nextSpi.countConnCheckMsg = true; + + Thread.sleep(firstSpi().failureDetectionTimeout()); + + firstSpi().countConnCheckMsg = false; + nextSpi.countConnCheckMsg = false; + + int sent = firstSpi().connCheckStatusMsgCntSent; + int received = nextSpi.connCheckStatusMsgCntReceived; + + assert sent >= 3 && sent < 7 : "messages sent: " + sent; + assert received >= 3 && received < 7 : "messages received: " + received; + } + finally { + firstSpi().resetState(); + + if (nextSpi != null) + nextSpi.resetState(); + } + } + + /** + * @throws Exception In case of error. + */ + public void testConnectionCheckMessageBackwardCompatibility() throws Exception { + TestTcpDiscoverySpi nextSpi = null; + TcpDiscoveryNode nextNode = null; + + IgniteProductVersion nextNodeVer = null; + + try { + assert firstSpi().connCheckStatusMsgCntSent == 0; + + nextNode = ((ServerImpl)(firstSpi().impl)).ring().nextNode(); + + assertNotNull(nextNode); + + nextSpi = null; + + for (int i = 1; i < spis.size(); i++) + if (spis.get(i).getLocalNode().id().equals(nextNode.id())) { + nextSpi = (TestTcpDiscoverySpi)spis.get(i); + break; + } + + assertNotNull(nextSpi); + + assert nextSpi.connCheckStatusMsgCntReceived == 0; + + nextNodeVer = nextNode.version(); + + // Overriding the version of the next node. Connection check message must not been sent to it. + nextNode.version(new IgniteProductVersion(TcpDiscoverySpi.FAILURE_DETECTION_MAJOR_VER, + (byte)(TcpDiscoverySpi.FAILURE_DETECTION_MINOR_VER - 1), TcpDiscoverySpi.FAILURE_DETECTION_MAINT_VER, + 0l, null)); + + firstSpi().countConnCheckMsg = true; + nextSpi.countConnCheckMsg = true; + + Thread.sleep(firstSpi().failureDetectionTimeout() / 2); + + firstSpi().countConnCheckMsg = false; + nextSpi.countConnCheckMsg = false; + + int sent = firstSpi().connCheckStatusMsgCntSent; + int received = nextSpi.connCheckStatusMsgCntReceived; + + assert sent == 0 : "messages sent: " + sent; + assert received == 0 : "messages received: " + received; + } + finally { + firstSpi().resetState(); + + if (nextSpi != null) + nextSpi.resetState(); + + if (nextNode != null && nextNodeVer != null) + nextNode.version(nextNodeVer); + } + } + + /** + * Returns the first spi with failure detection timeout enabled. + * + * @return SPI. + */ + private TestTcpDiscoverySpi firstSpi() { + return (TestTcpDiscoverySpi)spis.get(0); + } + + + /** + * Returns the second spi with failure detection timeout enabled. + * + * @return SPI. + */ + private TestTcpDiscoverySpi secondSpi() { + return (TestTcpDiscoverySpi)spis.get(1); + } + + /** + * + */ + private static class TestTcpDiscoverySpi extends TcpDiscoverySpi { + /** */ + private volatile boolean openSocketTimeout; + + /** */ + private volatile boolean openSocketTimeoutWait; + + /** */ + private volatile boolean writeToSocketTimeoutWait; + + /** */ + private volatile boolean countConnCheckMsg; + + /** */ + private volatile int connCheckStatusMsgCntSent; + + /** */ + private volatile int connCheckStatusMsgCntReceived; + + /** */ + private volatile boolean validTimeout = true; + + /** */ + private volatile IgniteSpiOperationTimeoutException err; + + + /** {@inheritDoc} */ + @Override protected Socket openSocket(InetSocketAddress sockAddr, IgniteSpiOperationTimeoutHelper timeoutHelper) + throws IOException, IgniteSpiOperationTimeoutException { + + if (openSocketTimeout) { + err = new IgniteSpiOperationTimeoutException("Timeout: openSocketTimeout"); + throw err; + } + else if (openSocketTimeoutWait) { + long timeout = timeoutHelper.nextTimeoutChunk(0); + + try { + Thread.sleep(timeout + 1000); + } + catch (InterruptedException e) { + // Ignore + } + + try { + timeoutHelper.nextTimeoutChunk(0); + } + catch (IgniteSpiOperationTimeoutException e) { + throw (err = new IgniteSpiOperationTimeoutException("Timeout: openSocketTimeoutWait")); + } + } + + Socket sock = super.openSocket(sockAddr, timeoutHelper); + + try { + Thread.sleep(1500); + } catch (InterruptedException e) { + // Ignore + } + + return sock; + } + + /** {@inheritDoc} */ + @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, long timeout) + throws IOException, IgniteCheckedException { + if (!(msg instanceof TcpDiscoveryPingRequest)) { + super.writeToSocket(sock, msg, timeout); + return; + } + + if (timeout >= IgniteConfiguration.DFLT_FAILURE_DETECTION_TIMEOUT) { + validTimeout = false; + + throw new IgniteCheckedException("Invalid timeout: " + timeout); + } + + if (writeToSocketTimeoutWait) { + try { + Thread.sleep(timeout); + } + catch (InterruptedException e) { + // Ignore + } + } + else + super.writeToSocket(sock, msg, timeout); + } + + /** {@inheritDoc} */ + @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, + GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException { + if (countConnCheckMsg && msg instanceof TcpDiscoveryConnectionCheckMessage) + connCheckStatusMsgCntSent++; + + super.writeToSocket(sock, msg, bout, timeout); + } + + /** {@inheritDoc} */ + protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock, int res, long timeout) + throws IOException { + if (countConnCheckMsg && msg instanceof TcpDiscoveryConnectionCheckMessage) + connCheckStatusMsgCntReceived++; + + super.writeToSocket(msg, sock, res, timeout); + } + + /** + * + */ + private void resetState() { + openSocketTimeout = false; + openSocketTimeoutWait = false; + writeToSocketTimeoutWait = false; + err = null; + validTimeout = true; + connCheckStatusMsgCntSent = 0; + connCheckStatusMsgCntReceived = 0; + countConnCheckMsg = false; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cff25e91/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java index ff86bda..3f71d7d 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java @@ -45,6 +45,9 @@ public class IgniteSpiCommunicationSelfTestSuite extends TestSuite { suite.addTest(new TestSuite(GridTcpCommunicationSpiMultithreadedSelfTest.class)); suite.addTest(new TestSuite(GridTcpCommunicationSpiMultithreadedShmemTest.class)); + suite.addTest(new TestSuite(GridTcpCommunicationSpiRecoveryFailureDetectionSelfTest.class)); + suite.addTest(new TestSuite(GridTcpCommunicationSpiTcpFailureDetectionSelfTest.class)); + suite.addTest(new TestSuite(GridTcpCommunicationSpiConfigSelfTest.class)); return suite; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cff25e91/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java index b7014ad..d77c432 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java @@ -44,6 +44,7 @@ public class IgniteSpiDiscoverySelfTestSuite extends TestSuite { suite.addTest(new TestSuite(TcpDiscoverySelfTest.class)); suite.addTest(new TestSuite(TcpDiscoverySpiSelfTest.class)); + suite.addTest(new TestSuite(TcpDiscoverySpiFailureTimeoutSelfTest.class)); suite.addTest(new TestSuite(TcpDiscoverySpiStartStopSelfTest.class)); suite.addTest(new TestSuite(TcpDiscoverySpiConfigSelfTest.class)); suite.addTest(new TestSuite(TcpDiscoveryMarshallerCheckSelfTest.class)); @@ -54,6 +55,7 @@ public class IgniteSpiDiscoverySelfTestSuite extends TestSuite { suite.addTest(new TestSuite(TcpClientDiscoverySpiSelfTest.class)); suite.addTest(new TestSuite(TcpClientDiscoveryMarshallerCheckSelfTest.class)); suite.addTest(new TestSuite(TcpClientDiscoverySpiMulticastTest.class)); + suite.addTest(new TestSuite(TcpClientDiscoverySpiFailureTimeoutSelfTest.class)); suite.addTest(new TestSuite(TcpDiscoveryNodeConsistentIdSelfTest.class)); suite.addTest(new TestSuite(TcpDiscoveryNodeConfigConsistentIdSelfTest.class));