Repository: incubator-ignite Updated Branches: refs/heads/ignite-sprint-5 9a16d1906 -> 3c0046e8e
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/838c0fd8/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java deleted file mode 100644 index ddbea0c..0000000 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java +++ /dev/null @@ -1,1185 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.spi.discovery.tcp; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.internal.util.io.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.marshaller.*; -import org.apache.ignite.marshaller.jdk.*; -import org.apache.ignite.resources.*; -import org.apache.ignite.spi.*; -import org.apache.ignite.spi.discovery.*; -import org.apache.ignite.spi.discovery.tcp.internal.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.*; -import org.apache.ignite.spi.discovery.tcp.messages.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.net.*; -import java.util.*; -import java.util.concurrent.atomic.*; - -/** - * Base class for TCP discovery SPIs. - */ -public abstract class TcpDiscoverySpiAdapter extends IgniteSpiAdapter implements DiscoverySpi { - /** Default port to listen (value is <tt>47500</tt>). */ - public static final int DFLT_PORT = 47500; - - /** Default timeout for joining topology (value is <tt>0</tt>). */ - public static final long DFLT_JOIN_TIMEOUT = 0; - - /** Default network timeout in milliseconds (value is <tt>5000ms</tt>). */ - public static final long DFLT_NETWORK_TIMEOUT = 5000; - - /** Default value for thread priority (value is <tt>10</tt>). */ - public static final int DFLT_THREAD_PRI = 10; - - /** Default heartbeat messages issuing frequency (value is <tt>100ms</tt>). */ - public static final long DFLT_HEARTBEAT_FREQ = 100; - - /** Default size of topology snapshots history. */ - public static final int DFLT_TOP_HISTORY_SIZE = 1000; - - /** Response OK. */ - protected static final int RES_OK = 1; - - /** Response CONTINUE JOIN. */ - protected static final int RES_CONTINUE_JOIN = 100; - - /** Response WAIT. */ - protected static final int RES_WAIT = 200; - - /** Local address. */ - protected String locAddr; - - /** IP finder. */ - protected TcpDiscoveryIpFinder ipFinder; - - /** Socket operations timeout. */ - protected long sockTimeout; // Must be initialized in the constructor of child class. - - /** Message acknowledgement timeout. */ - protected long ackTimeout; // Must be initialized in the constructor of child class. - - /** Network timeout. */ - protected long netTimeout = DFLT_NETWORK_TIMEOUT; - - /** Join timeout. */ - @SuppressWarnings("RedundantFieldInitialization") - protected long joinTimeout = DFLT_JOIN_TIMEOUT; - - /** Thread priority for all threads started by SPI. */ - protected int threadPri = DFLT_THREAD_PRI; - - /** Heartbeat messages issuing frequency. */ - protected long hbFreq = DFLT_HEARTBEAT_FREQ; - - /** Size of topology snapshots history. */ - protected int topHistSize = DFLT_TOP_HISTORY_SIZE; - - /** Grid discovery listener. */ - protected volatile DiscoverySpiListener lsnr; - - /** Data exchange. */ - protected DiscoverySpiDataExchange exchange; - - /** Metrics provider. */ - protected DiscoveryMetricsProvider metricsProvider; - - /** Local node attributes. */ - protected Map<String, Object> locNodeAttrs; - - /** Local node version. */ - protected IgniteProductVersion locNodeVer; - - /** Local node. */ - protected TcpDiscoveryNode locNode; - - /** Local host. */ - protected InetAddress locHost; - - /** Internal and external addresses of local node. */ - protected Collection<InetSocketAddress> locNodeAddrs; - - /** Socket timeout worker. */ - protected SocketTimeoutWorker sockTimeoutWorker; - - /** Start time of the very first grid node. */ - protected volatile long gridStartTime; - - /** Marshaller. */ - protected final Marshaller marsh = new JdkMarshaller(); - - /** Statistics. */ - protected final TcpDiscoveryStatistics stats = new TcpDiscoveryStatistics(); - - /** Logger. */ - @LoggerResource - protected IgniteLogger log; - - /** - * Check parameters set to this SPI. - */ - protected void checkParameters() { - assertParameter(ipFinder != null, "ipFinder != null"); - assertParameter(hbFreq > 0, "heartbeatFreq > 0"); - assertParameter(netTimeout > 0, "networkTimeout > 0"); - assertParameter(sockTimeout > 0, "sockTimeout > 0"); - assertParameter(ackTimeout > 0, "ackTimeout > 0"); - } - - /** - * Inject resources - * - * @param ignite Ignite. - */ - @IgniteInstanceResource - @Override protected void injectResources(Ignite ignite) { - super.injectResources(ignite); - - // Inject resource. - if (ignite != null) - setLocalAddress(ignite.configuration().getLocalHost()); - } - - /** - * Sets local host IP address that discovery SPI uses. - * <p> - * If not provided, by default a first found non-loopback address - * will be used. If there is no non-loopback address available, - * then {@link InetAddress#getLocalHost()} will be used. - * - * @param locAddr IP address. - */ - @IgniteSpiConfiguration(optional = true) - public void setLocalAddress(String locAddr) { - // Injection should not override value already set by Spring or user. - if (this.locAddr == null) - this.locAddr = locAddr; - } - - /** - * Gets local address that was set to SPI with {@link #setLocalAddress(String)} method. - * - * @return local address. - */ - public String getLocalAddress() { - return locAddr; - } - - /** - * Gets IP finder for IP addresses sharing and storing. - * - * @return IP finder for IP addresses sharing and storing. - */ - public TcpDiscoveryIpFinder getIpFinder() { - return ipFinder; - } - - /** - * Sets IP finder for IP addresses sharing and storing. - * <p> - * If not provided {@link org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder} will be used by default. - * - * @param ipFinder IP finder. - */ - @IgniteSpiConfiguration(optional = true) - public void setIpFinder(TcpDiscoveryIpFinder ipFinder) { - this.ipFinder = ipFinder; - } - - /** - * Sets socket operations timeout. This timeout is used to limit connection time and - * write-to-socket time. - * <p> - * Note that when running Ignite on Amazon EC2, socket timeout must be set to a value - * significantly greater than the default (e.g. to {@code 30000}). - * <p> - * If not specified, default is {@link TcpDiscoverySpi#DFLT_SOCK_TIMEOUT}, - * {@link TcpClientDiscoverySpi#DFLT_SOCK_TIMEOUT}. - * - * @param sockTimeout Socket connection timeout. - */ - @IgniteSpiConfiguration(optional = true) - public void setSocketTimeout(long sockTimeout) { - this.sockTimeout = sockTimeout; - } - - /** - * Sets timeout for receiving acknowledgement for sent message. - * <p> - * If acknowledgement is not received within this timeout, sending is considered as failed - * and SPI tries to repeat message sending. - * <p> - * If not specified, default is {@link TcpDiscoverySpi#DFLT_ACK_TIMEOUT}, - * {@link TcpClientDiscoverySpi#DFLT_ACK_TIMEOUT}. - * - * @param ackTimeout Acknowledgement timeout. - */ - @IgniteSpiConfiguration(optional = true) - public void setAckTimeout(long ackTimeout) { - this.ackTimeout = ackTimeout; - } - - /** - * Sets maximum network timeout to use for network operations. - * <p> - * If not specified, default is {@link #DFLT_NETWORK_TIMEOUT}. - * - * @param netTimeout Network timeout. - */ - @IgniteSpiConfiguration(optional = true) - public void setNetworkTimeout(long netTimeout) { - this.netTimeout = netTimeout; - } - - /** - * Join timeout. - * <p> - * If non-shared IP finder is used and node fails to connect to - * any address from IP finder, node keeps trying to join within this - * timeout. If all addresses are still unresponsive, exception is thrown - * and node startup fails. - * @return Join timeout in milliseconds, ({@code 0} means wait forever). - */ - public long getJoinTimeout() { - return joinTimeout; - } - - /** - * Sets join timeout. - * <p> - * If non-shared IP finder is used and node fails to connect to - * any address from IP finder, node keeps trying to join within this - * timeout. If all addresses are still unresponsive, exception is thrown - * and node startup fails. - * <p> - * If not specified, default is {@link #DFLT_JOIN_TIMEOUT}. - * - * @param joinTimeout Join timeout ({@code 0} means wait forever). - * - * @see TcpDiscoveryIpFinder#isShared() - */ - @IgniteSpiConfiguration(optional = true) - public void setJoinTimeout(long joinTimeout) { - this.joinTimeout = joinTimeout; - } - - /** - * Sets thread priority. All threads within SPI will be started with it. - * <p> - * If not provided, default value is {@link #DFLT_THREAD_PRI} - * - * @param threadPri Thread priority. - */ - @IgniteSpiConfiguration(optional = true) - public void setThreadPriority(int threadPri) { - this.threadPri = threadPri; - } - - /** - * Sets delay between issuing of heartbeat messages. SPI sends heartbeat messages - * in configurable time interval to other nodes to notify them about its state. - * <p> - * If not provided, default value is {@link #DFLT_HEARTBEAT_FREQ}. - * - * @param hbFreq Heartbeat frequency in milliseconds. - */ - @IgniteSpiConfiguration(optional = true) - public void setHeartbeatFrequency(long hbFreq) { - this.hbFreq = hbFreq; - } - - /** - * @return Size of topology snapshots history. - */ - public long getTopHistorySize() { - return topHistSize; - } - - /** - * Sets size of topology snapshots history. Specified size should be greater than or equal to default size - * {@link #DFLT_TOP_HISTORY_SIZE}. - * - * @param topHistSize Size of topology snapshots history. - */ - @IgniteSpiConfiguration(optional = true) - public void setTopHistorySize(int topHistSize) { - if (topHistSize < DFLT_TOP_HISTORY_SIZE) { - U.warn(log, "Topology history size should be greater than or equal to default size. " + - "Specified size will not be set [curSize=" + this.topHistSize + ", specifiedSize=" + topHistSize + - ", defaultSize=" + DFLT_TOP_HISTORY_SIZE + ']'); - - return; - } - - this.topHistSize = topHistSize; - } - - /** {@inheritDoc} */ - @Override public void setNodeAttributes(Map<String, Object> attrs, IgniteProductVersion ver) { - assert locNodeAttrs == null; - assert locNodeVer == null; - - if (log.isDebugEnabled()) { - log.debug("Node attributes to set: " + attrs); - log.debug("Node version to set: " + ver); - } - - locNodeAttrs = attrs; - locNodeVer = ver; - } - - /** {@inheritDoc} */ - @Override public Collection<Object> injectables() { - return F.<Object>asList(ipFinder); - } - - /** - * Gets socket timeout. - * - * @return Socket timeout. - */ - public long getSocketTimeout() { - return sockTimeout; - } - - /** - * Gets message acknowledgement timeout. - * - * @return Message acknowledgement timeout. - */ - public long getAckTimeout() { - return ackTimeout; - } - - /** - * Gets network timeout. - * - * @return Network timeout. - */ - public long getNetworkTimeout() { - return netTimeout; - } - - /** - * Gets thread priority. All threads within SPI will be started with it. - * - * @return Thread priority. - */ - public int getThreadPriority() { - return threadPri; - } - - /** - * Gets delay between heartbeat messages sent by coordinator. - * - * @return Time period in milliseconds. - */ - public long getHeartbeatFrequency() { - return hbFreq; - } - - /** - * Gets {@link org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder} (string representation). - * - * @return IPFinder (string representation). - */ - public String getIpFinderFormatted() { - return ipFinder.toString(); - } - - /** - * Gets joined nodes count. - * - * @return Nodes joined count. - */ - public long getNodesJoined() { - return stats.joinedNodesCount(); - } - - /** - * Gets left nodes count. - * - * @return Left nodes count. - */ - public long getNodesLeft() { - return stats.leftNodesCount(); - } - - /** - * Gets failed nodes count. - * - * @return Failed nodes count. - */ - public long getNodesFailed() { - return stats.failedNodesCount(); - } - - /** - * Gets pending messages registered count. - * - * @return Pending messages registered count. - */ - public long getPendingMessagesRegistered() { - return stats.pendingMessagesRegistered(); - } - - /** - * Gets pending messages discarded count. - * - * @return Pending messages registered count. - */ - public long getPendingMessagesDiscarded() { - return stats.pendingMessagesDiscarded(); - } - - /** - * Gets avg message processing time. - * - * @return Avg message processing time. - */ - public long getAvgMessageProcessingTime() { - return stats.avgMessageProcessingTime(); - } - - /** - * Gets max message processing time. - * - * @return Max message processing time. - */ - public long getMaxMessageProcessingTime() { - return stats.maxMessageProcessingTime(); - } - - /** - * Gets total received messages count. - * - * @return Total received messages count. - */ - public int getTotalReceivedMessages() { - return stats.totalReceivedMessages(); - } - - /** - * Gets received messages counts (grouped by type). - * - * @return Map containing message types and respective counts. - */ - public Map<String, Integer> getReceivedMessages() { - return stats.receivedMessages(); - } - - /** - * Gets total processed messages count. - * - * @return Total processed messages count. - */ - public int getTotalProcessedMessages() { - return stats.totalProcessedMessages(); - } - - /** - * Gets processed messages counts (grouped by type). - * - * @return Map containing message types and respective counts. - */ - public Map<String, Integer> getProcessedMessages() { - return stats.processedMessages(); - } - - /** - * Gets time local node has been coordinator since. - * - * @return Time local node is coordinator since. - */ - public long getCoordinatorSinceTimestamp() { - return stats.coordinatorSinceTimestamp(); - } - - /** {@inheritDoc} */ - @Override protected void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException { - super.onContextInitialized0(spiCtx); - - ipFinder.onSpiContextInitialized(spiCtx); - } - - /** {@inheritDoc} */ - @Override protected void onContextDestroyed0() { - super.onContextDestroyed0(); - - if (ipFinder != null) - ipFinder.onSpiContextDestroyed(); - } - - /** {@inheritDoc} */ - @Override public ClusterNode getLocalNode() { - return locNode; - } - - /** {@inheritDoc} */ - @Override public void setListener(@Nullable DiscoverySpiListener lsnr) { - this.lsnr = lsnr; - } - - /** {@inheritDoc} */ - @Override public void setDataExchange(DiscoverySpiDataExchange exchange) { - this.exchange = exchange; - } - - /** {@inheritDoc} */ - @Override public void setMetricsProvider(DiscoveryMetricsProvider metricsProvider) { - this.metricsProvider = metricsProvider; - } - - /** {@inheritDoc} */ - @Override public long getGridStartTime() { - assert gridStartTime != 0; - - return gridStartTime; - } - - /** - * @param sockAddr Remote address. - * @return Opened socket. - * @throws IOException If failed. - */ - protected Socket openSocket(InetSocketAddress sockAddr) throws IOException { - assert sockAddr != null; - - InetSocketAddress resolved = sockAddr.isUnresolved() ? - new InetSocketAddress(InetAddress.getByName(sockAddr.getHostName()), sockAddr.getPort()) : sockAddr; - - InetAddress addr = resolved.getAddress(); - - assert addr != null; - - Socket sock = new Socket(); - - sock.bind(new InetSocketAddress(locHost, 0)); - - sock.setTcpNoDelay(true); - - sock.connect(resolved, (int)sockTimeout); - - writeToSocket(sock, U.IGNITE_HEADER); - - return sock; - } - - /** - * Writes message to the socket. - * - * @param sock Socket. - * @param data Raw data to write. - * @throws IOException If IO failed or write timed out. - */ - @SuppressWarnings("ThrowFromFinallyBlock") - protected void writeToSocket(Socket sock, byte[] data) throws IOException { - assert sock != null; - assert data != null; - - SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + sockTimeout); - - sockTimeoutWorker.addTimeoutObject(obj); - - IOException err = null; - - try { - OutputStream out = sock.getOutputStream(); - - out.write(data); - - out.flush(); - } - catch (IOException e) { - err = e; - } - finally { - boolean cancelled = obj.cancel(); - - if (cancelled) - sockTimeoutWorker.removeTimeoutObject(obj); - - // Throw original exception. - if (err != null) - throw err; - - if (!cancelled) - throw new SocketTimeoutException("Write timed out (socket was concurrently closed)."); - } - } - - /** - * Writes message to the socket. - * - * @param sock Socket. - * @param msg Message. - * @throws IOException If IO failed or write timed out. - * @throws IgniteCheckedException If marshalling failed. - */ - protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg) throws IOException, IgniteCheckedException { - writeToSocket(sock, msg, new GridByteArrayOutputStream(8 * 1024)); // 8K. - } - - /** - * Writes message to the socket. - * - * @param sock Socket. - * @param msg Message. - * @param bout Byte array output stream. - * @throws IOException If IO failed or write timed out. - * @throws IgniteCheckedException If marshalling failed. - */ - @SuppressWarnings("ThrowFromFinallyBlock") - protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, GridByteArrayOutputStream bout) - throws IOException, IgniteCheckedException { - assert sock != null; - assert msg != null; - assert bout != null; - - // Marshall message first to perform only write after. - marsh.marshal(msg, bout); - - SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + sockTimeout); - - sockTimeoutWorker.addTimeoutObject(obj); - - IOException err = null; - - try { - OutputStream out = sock.getOutputStream(); - - bout.writeTo(out); - - out.flush(); - } - catch (IOException e) { - err = e; - } - finally { - boolean cancelled = obj.cancel(); - - if (cancelled) - sockTimeoutWorker.removeTimeoutObject(obj); - - // Throw original exception. - if (err != null) - throw err; - - if (!cancelled) - throw new SocketTimeoutException("Write timed out (socket was concurrently closed)."); - } - } - - /** - * Writes response to the socket. - * - * @param sock Socket. - * @param res Integer response. - * @throws IOException If IO failed or write timed out. - */ - @SuppressWarnings("ThrowFromFinallyBlock") - protected void writeToSocket(Socket sock, int res) throws IOException { - assert sock != null; - - SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + sockTimeout); - - sockTimeoutWorker.addTimeoutObject(obj); - - OutputStream out = sock.getOutputStream(); - - IOException err = null; - - try { - out.write(res); - - out.flush(); - } - catch (IOException e) { - err = e; - } - finally { - boolean cancelled = obj.cancel(); - - if (cancelled) - sockTimeoutWorker.removeTimeoutObject(obj); - - // Throw original exception. - if (err != null) - throw err; - - if (!cancelled) - throw new SocketTimeoutException("Write timed out (socket was concurrently closed)."); - } - } - - /** - * Reads message from the socket limiting read time. - * - * @param sock Socket. - * @param in Input stream (in case socket stream was wrapped). - * @param timeout Socket timeout for this operation. - * @return Message. - * @throws IOException If IO failed or read timed out. - * @throws IgniteCheckedException If unmarshalling failed. - */ - protected <T> T readMessage(Socket sock, @Nullable InputStream in, long timeout) throws IOException, IgniteCheckedException { - assert sock != null; - - int oldTimeout = sock.getSoTimeout(); - - try { - sock.setSoTimeout((int)timeout); - - return marsh.unmarshal(in == null ? sock.getInputStream() : in, U.gridClassLoader()); - } - catch (IOException | IgniteCheckedException e) { - if (X.hasCause(e, SocketTimeoutException.class)) - LT.warn(log, null, "Timed out waiting for message to be read (most probably, the reason is " + - "in long GC pauses on remote node. Current timeout: " + timeout + '.'); - - throw e; - } - finally { - // Quietly restore timeout. - try { - sock.setSoTimeout(oldTimeout); - } - catch (SocketException ignored) { - // No-op. - } - } - } - - /** - * Reads message delivery receipt from the socket. - * - * @param sock Socket. - * @param timeout Socket timeout for this operation. - * @return Receipt. - * @throws IOException If IO failed or read timed out. - */ - protected int readReceipt(Socket sock, long timeout) throws IOException { - assert sock != null; - - int oldTimeout = sock.getSoTimeout(); - - try { - sock.setSoTimeout((int)timeout); - - int res = sock.getInputStream().read(); - - if (res == -1) - throw new EOFException(); - - return res; - } - catch (SocketTimeoutException e) { - LT.warn(log, null, "Timed out waiting for message delivery receipt (most probably, the reason is " + - "in long GC pauses on remote node; consider tuning GC and increasing 'ackTimeout' " + - "configuration property). Will retry to send message with increased timeout. " + - "Current timeout: " + timeout + '.'); - - stats.onAckTimeout(); - - throw e; - } - finally { - // Quietly restore timeout. - try { - sock.setSoTimeout(oldTimeout); - } - catch (SocketException ignored) { - // No-op. - } - } - } - - /** - * Resolves addresses registered in the IP finder, removes duplicates and local host - * address and returns the collection of. - * - * @return Resolved addresses without duplicates and local address (potentially - * empty but never null). - * @throws org.apache.ignite.spi.IgniteSpiException If an error occurs. - */ - protected Collection<InetSocketAddress> resolvedAddresses() throws IgniteSpiException { - List<InetSocketAddress> res = new ArrayList<>(); - - Collection<InetSocketAddress> addrs; - - // Get consistent addresses collection. - while (true) { - try { - addrs = registeredAddresses(); - - break; - } - catch (IgniteSpiException e) { - LT.error(log, e, "Failed to get registered addresses from IP finder on start " + - "(retrying every 2000 ms)."); - } - - try { - U.sleep(2000); - } - catch (IgniteInterruptedCheckedException e) { - throw new IgniteSpiException("Thread has been interrupted.", e); - } - } - - for (InetSocketAddress addr : addrs) { - assert addr != null; - - try { - InetSocketAddress resolved = addr.isUnresolved() ? - new InetSocketAddress(InetAddress.getByName(addr.getHostName()), addr.getPort()) : addr; - - if (locNodeAddrs == null || !locNodeAddrs.contains(resolved)) - res.add(resolved); - } - catch (UnknownHostException ignored) { - LT.warn(log, null, "Failed to resolve address from IP finder (host is unknown): " + addr); - - // Add address in any case. - res.add(addr); - } - } - - if (!res.isEmpty()) - Collections.shuffle(res); - - return res; - } - - /** - * Gets addresses registered in the IP finder, initializes addresses having no - * port (or 0 port) with {@link #DFLT_PORT}. - * - * @return Registered addresses. - * @throws org.apache.ignite.spi.IgniteSpiException If an error occurs. - */ - protected Collection<InetSocketAddress> registeredAddresses() throws IgniteSpiException { - Collection<InetSocketAddress> res = new ArrayList<>(); - - for (InetSocketAddress addr : ipFinder.getRegisteredAddresses()) { - if (addr.getPort() == 0) { - // TcpDiscoveryNode.discoveryPort() returns an correct port for a server node and 0 for client node. - int port = locNode.discoveryPort() != 0 ? locNode.discoveryPort() : DFLT_PORT; - - addr = addr.isUnresolved() ? new InetSocketAddress(addr.getHostName(), port) : - new InetSocketAddress(addr.getAddress(), port); - } - - res.add(addr); - } - - return res; - } - - /** - * @param msg Message. - * @return Error. - */ - protected IgniteSpiException duplicateIdError(TcpDiscoveryDuplicateIdMessage msg) { - assert msg != null; - - return new IgniteSpiException("Local node has the same ID as existing node in topology " + - "(fix configuration and restart local node) [localNode=" + locNode + - ", existingNode=" + msg.node() + ']'); - } - - /** - * @param msg Message. - * @return Error. - */ - protected IgniteSpiException authenticationFailedError(TcpDiscoveryAuthFailedMessage msg) { - assert msg != null; - - return new IgniteSpiException(new IgniteAuthenticationException("Authentication failed [nodeId=" + - msg.creatorNodeId() + ", addr=" + msg.address().getHostAddress() + ']')); - } - - /** - * @param msg Message. - * @return Error. - */ - protected IgniteSpiException checkFailedError(TcpDiscoveryCheckFailedMessage msg) { - assert msg != null; - - return versionCheckFailed(msg) ? new IgniteSpiVersionCheckException(msg.error()) : - new IgniteSpiException(msg.error()); - } - - /** - * @param msg Message. - * @return Whether delivery of the message is ensured. - */ - protected boolean ensured(TcpDiscoveryAbstractMessage msg) { - return U.getAnnotation(msg.getClass(), TcpDiscoveryEnsureDelivery.class) != null; - } - - /** - * @param msg Failed message. - * @return {@code True} if specified failed message relates to version incompatibility, {@code false} otherwise. - * @deprecated Parsing of error message was used for preserving backward compatibility. We should remove it - * and create separate message for failed version check with next major release. - */ - @Deprecated - private static boolean versionCheckFailed(TcpDiscoveryCheckFailedMessage msg) { - return msg.error().contains("versions are not compatible"); - } - - /** - * @param nodeId Node ID. - * @return Marshalled exchange data. - */ - protected Map<Integer, byte[]> collectExchangeData(UUID nodeId) { - Map<Integer, Serializable> data = exchange.collect(nodeId); - - if (data == null) - return null; - - Map<Integer, byte[]> data0 = U.newHashMap(data.size()); - - for (Map.Entry<Integer, Serializable> entry : data.entrySet()) { - try { - byte[] bytes = marsh.marshal(entry.getValue()); - - data0.put(entry.getKey(), bytes); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to marshal discovery data " + - "[comp=" + entry.getKey() + ", data=" + entry.getValue() + ']', e); - } - } - - return data0; - } - - /** - * @param joiningNodeID Joining node ID. - * @param nodeId Remote node ID for which data is provided. - * @param data Collection of marshalled discovery data objects from different components. - * @param clsLdr Class loader for discovery data unmarshalling. - */ - protected void onExchange(UUID joiningNodeID, - UUID nodeId, - Map<Integer, byte[]> data, - ClassLoader clsLdr) - { - Map<Integer, Serializable> data0 = U.newHashMap(data.size()); - - for (Map.Entry<Integer, byte[]> entry : data.entrySet()) { - try { - Serializable compData = marsh.unmarshal(entry.getValue(), clsLdr); - - data0.put(entry.getKey(), compData); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to unmarshal discovery data for component: " + entry.getKey(), e); - } - } - - exchange.onExchange(joiningNodeID, nodeId, data0); - } - - /** - * Handles sockets timeouts. - */ - protected class SocketTimeoutWorker extends IgniteSpiThread { - /** Time-based sorted set for timeout objects. */ - private final GridConcurrentSkipListSet<SocketTimeoutObject> timeoutObjs = - new GridConcurrentSkipListSet<>(new Comparator<SocketTimeoutObject>() { - @Override public int compare(SocketTimeoutObject o1, SocketTimeoutObject o2) { - int res = Long.compare(o1.endTime(), o2.endTime()); - - if (res != 0) - return res; - - return Long.compare(o1.id(), o2.id()); - } - }); - - /** Mutex. */ - private final Object mux0 = new Object(); - - /** - * - */ - SocketTimeoutWorker() { - super(gridName, "tcp-disco-sock-timeout-worker", log); - - setPriority(threadPri); - } - - /** - * @param timeoutObj Timeout object to add. - */ - @SuppressWarnings({"NakedNotify"}) - public void addTimeoutObject(SocketTimeoutObject timeoutObj) { - assert timeoutObj != null && timeoutObj.endTime() > 0 && timeoutObj.endTime() != Long.MAX_VALUE; - - timeoutObjs.add(timeoutObj); - - if (timeoutObjs.firstx() == timeoutObj) { - synchronized (mux0) { - mux0.notifyAll(); - } - } - } - - /** - * @param timeoutObj Timeout object to remove. - */ - public void removeTimeoutObject(SocketTimeoutObject timeoutObj) { - assert timeoutObj != null; - - timeoutObjs.remove(timeoutObj); - } - - /** {@inheritDoc} */ - @Override protected void body() throws InterruptedException { - if (log.isDebugEnabled()) - log.debug("Socket timeout worker has been started."); - - while (!isInterrupted()) { - long now = U.currentTimeMillis(); - - for (Iterator<SocketTimeoutObject> iter = timeoutObjs.iterator(); iter.hasNext(); ) { - SocketTimeoutObject timeoutObj = iter.next(); - - if (timeoutObj.endTime() <= now) { - iter.remove(); - - if (timeoutObj.onTimeout()) { - LT.warn(log, null, "Socket write has timed out (consider increasing " + - "'sockTimeout' configuration property) [sockTimeout=" + sockTimeout + ']'); - - stats.onSocketTimeout(); - } - } - else - break; - } - - synchronized (mux0) { - while (true) { - // Access of the first element must be inside of - // synchronization block, so we don't miss out - // on thread notification events sent from - // 'addTimeoutObject(..)' method. - SocketTimeoutObject first = timeoutObjs.firstx(); - - if (first != null) { - long waitTime = first.endTime() - U.currentTimeMillis(); - - if (waitTime > 0) - mux0.wait(waitTime); - else - break; - } - else - mux0.wait(5000); - } - } - } - } - } - - /** - * Socket timeout object. - */ - private static class SocketTimeoutObject { - /** */ - private static final AtomicLong idGen = new AtomicLong(); - - /** */ - private final long id = idGen.incrementAndGet(); - - /** */ - private final Socket sock; - - /** */ - private final long endTime; - - /** */ - private final AtomicBoolean done = new AtomicBoolean(); - - /** - * @param sock Socket. - * @param endTime End time. - */ - SocketTimeoutObject(Socket sock, long endTime) { - assert sock != null; - assert endTime > 0; - - this.sock = sock; - this.endTime = endTime; - } - - /** - * @return {@code True} if object has not yet been processed. - */ - boolean cancel() { - return done.compareAndSet(false, true); - } - - /** - * @return {@code True} if object has not yet been canceled. - */ - boolean onTimeout() { - if (done.compareAndSet(false, true)) { - // Close socket - timeout occurred. - U.closeQuiet(sock); - - return true; - } - - return false; - } - - /** - * @return End time. - */ - long endTime() { - return endTime; - } - - /** - * @return ID. - */ - long id() { - return id; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(SocketTimeoutObject.class, this); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/838c0fd8/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiMBean.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiMBean.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiMBean.java index df9d0f4..95281c3 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiMBean.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiMBean.java @@ -272,4 +272,12 @@ public interface TcpDiscoverySpiMBean extends IgniteSpiManagementMBean { */ @MXBeanDescription("Dump debug info.") public void dumpDebugInfo(); + + /** + * Whether or not discovery is in client mode. + * + * @return {@code true} if node is in client mode. + */ + @MXBeanDescription("Client mode.") + public boolean isClientMode(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/838c0fd8/modules/core/src/test/java/org/apache/ignite/internal/GridReleaseTypeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridReleaseTypeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridReleaseTypeSelfTest.java index b2374fb..603f66c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/GridReleaseTypeSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridReleaseTypeSelfTest.java @@ -45,25 +45,33 @@ public class GridReleaseTypeSelfTest extends GridCommonAbstractTest { @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); - TcpDiscoverySpiAdapter discoSpi; + TcpDiscoverySpi discoSpi; if (clientMode) { - discoSpi = new TcpClientDiscoverySpi() { - @Override public void setNodeAttributes(Map<String, Object> attrs, IgniteProductVersion ver) { + discoSpi = new TcpDiscoverySpi() { + @Override public TcpDiscoverySpi setNodeAttributes(Map<String, Object> attrs, + IgniteProductVersion ver) { super.setNodeAttributes(attrs, ver); attrs.put(IgniteNodeAttributes.ATTR_BUILD_VER, nodeVer); + + return this; } }; + discoSpi.setClientMode(true); + cfg.setClientMode(true); } else { discoSpi = new TcpDiscoverySpi() { - @Override public void setNodeAttributes(Map<String, Object> attrs, IgniteProductVersion ver) { + @Override public TcpDiscoverySpi setNodeAttributes(Map<String, Object> attrs, + IgniteProductVersion ver) { super.setNodeAttributes(attrs, ver); attrs.put(IgniteNodeAttributes.ATTR_BUILD_VER, nodeVer); + + return this; } }; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/838c0fd8/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAliveCacheSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAliveCacheSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAliveCacheSelfTest.java index 03fc01b..01c8377 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAliveCacheSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAliveCacheSelfTest.java @@ -89,20 +89,14 @@ public class GridDiscoveryManagerAliveCacheSelfTest extends GridCommonAbstractTe cCfg.setRebalanceMode(SYNC); cCfg.setWriteSynchronizationMode(FULL_SYNC); - TcpDiscoverySpiAdapter disc; + TcpDiscoverySpi disc = new TcpDiscoverySpi(); if (clientMode && ((gridName.charAt(gridName.length() - 1) - '0') & 1) != 0) { - disc = new TcpClientDiscoverySpi(); - + disc.setClientMode(true); cfg.setClientMode(true); } - else { - TcpDiscoverySpi srvDisc = new TcpDiscoverySpi(); - - srvDisc.setMaxMissedClientHeartbeats(50); - - disc = srvDisc; - } + else + disc.setMaxMissedClientHeartbeats(50); disc.setHeartbeatFrequency(500); disc.setIpFinder(IP_FINDER); @@ -186,8 +180,7 @@ public class GridDiscoveryManagerAliveCacheSelfTest extends GridCommonAbstractTe @SuppressWarnings("BusyWait") private void awaitDiscovery(long nodesCnt) throws InterruptedException { for (Ignite g : alive) { - if (g.configuration().getDiscoverySpi() instanceof TcpClientDiscoverySpi) - ((TcpClientDiscoverySpi)g.configuration().getDiscoverySpi()).waitForMessagePrecessed(); + ((TcpDiscoverySpi)g.configuration().getDiscoverySpi()).waitForClientMessagePrecessed(); while (g.cluster().nodes().size() != nodesCnt) Thread.sleep(10); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/838c0fd8/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAttributesSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAttributesSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAttributesSelfTest.java index fc953d9..dbe7d72 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAttributesSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAttributesSelfTest.java @@ -53,7 +53,7 @@ public abstract class GridDiscoveryManagerAttributesSelfTest extends GridCommonA cfg.setDeploymentMode(mode); cfg.setPeerClassLoadingEnabled(p2pEnabled); - TcpDiscoverySpiAdapter discoverySpi = createDiscovery(cfg); + TcpDiscoverySpi discoverySpi = new TcpDiscoverySpi(); discoverySpi.setIpFinder(IP_FINDER); @@ -75,14 +75,6 @@ public abstract class GridDiscoveryManagerAttributesSelfTest extends GridCommonA } /** - * @return Discovery SPI. - * @param cfg DiscoverySpi - */ - protected TcpDiscoverySpiAdapter createDiscovery(IgniteConfiguration cfg) { - return new TcpDiscoverySpi(); - } - - /** * @throws Exception If failed. */ public void testPreferIpV4StackTrue() throws Exception { @@ -185,13 +177,14 @@ public abstract class GridDiscoveryManagerAttributesSelfTest extends GridCommonA * */ public static class ClientDiscovery extends GridDiscoveryManagerAttributesSelfTest { - /** {@inheritDoc} - * @param cfg*/ - @Override protected TcpDiscoverySpiAdapter createDiscovery(IgniteConfiguration cfg) { + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + if (Boolean.TRUE.equals(cfg.isClientMode())) - return new TcpClientDiscoverySpi(); + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setClientMode(true); - return super.createDiscovery(cfg); + return cfg; } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/838c0fd8/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerSelfTest.java index 5f5bddb..7a4baa8 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerSelfTest.java @@ -70,7 +70,7 @@ public abstract class GridDiscoveryManagerSelfTest extends GridCommonAbstractTes cfg.setCacheConfiguration(ccfg1, ccfg2); } - TcpDiscoverySpiAdapter discoverySpi = createDiscovery(cfg); + TcpDiscoverySpi discoverySpi = new TcpDiscoverySpi(); discoverySpi.setIpFinder(IP_FINDER); @@ -80,14 +80,6 @@ public abstract class GridDiscoveryManagerSelfTest extends GridCommonAbstractTes } /** - * @return Discovery SPI. - * @param cfg DiscoverySpi - */ - protected TcpDiscoverySpiAdapter createDiscovery(IgniteConfiguration cfg) { - return new TcpDiscoverySpi(); - } - - /** * @throws Exception If failed. */ public void testHasNearCache() throws Exception { @@ -207,13 +199,14 @@ public abstract class GridDiscoveryManagerSelfTest extends GridCommonAbstractTes * */ public static class ClientDiscovery extends GridDiscoveryManagerSelfTest { - /** {@inheritDoc} - * @param cfg*/ - @Override protected TcpDiscoverySpiAdapter createDiscovery(IgniteConfiguration cfg) { + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + if (Boolean.TRUE.equals(cfg.isClientMode())) - return new TcpClientDiscoverySpi(); + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setClientMode(true); - return super.createDiscovery(cfg); + return cfg; } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/838c0fd8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTcpClientDiscoveryMultiThreadedTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTcpClientDiscoveryMultiThreadedTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTcpClientDiscoveryMultiThreadedTest.java index 72fa72e..e22bd57 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTcpClientDiscoveryMultiThreadedTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTcpClientDiscoveryMultiThreadedTest.java @@ -32,7 +32,7 @@ import java.util.concurrent.atomic.*; import static org.apache.ignite.cache.CacheMode.*; /** - * Tests {@link TcpClientDiscoverySpi} with multiple client nodes that interact with a cache concurrently. + * Tests {@link TcpDiscoverySpi} in client mode with multiple client nodes that interact with a cache concurrently. */ public class GridCacheTcpClientDiscoveryMultiThreadedTest extends GridCacheAbstractSelfTest { /** Server nodes count. */ @@ -78,10 +78,10 @@ public class GridCacheTcpClientDiscoveryMultiThreadedTest extends GridCacheAbstr clientFinder.setAddresses(addrs); - TcpClientDiscoverySpi discoverySpi = new TcpClientDiscoverySpi(); - discoverySpi.setIpFinder(clientFinder); + cfg.setDiscoverySpi(new TcpDiscoverySpi() + .setClientMode(true) + .setIpFinder(clientFinder)); - cfg.setDiscoverySpi(discoverySpi); cfg.setClientMode(true); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/838c0fd8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java index 21233cc..6aecdad 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java @@ -47,10 +47,10 @@ public abstract class IgniteClientDataStructuresAbstractTest extends GridCommonA cfg.setClientMode(true); if (clientDiscovery()) - cfg.setDiscoverySpi(new TcpClientDiscoverySpi()); + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setClientMode(true); } - ((TcpDiscoverySpiAdapter)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); return cfg; } @@ -265,10 +265,7 @@ public abstract class IgniteClientDataStructuresAbstractTest extends GridCommonA assertTrue(ignite.configuration().isClientMode()); - if (clientDiscovery()) - assertTrue(ignite.configuration().getDiscoverySpi() instanceof TcpClientDiscoverySpi); - else - assertTrue(ignite.configuration().getDiscoverySpi() instanceof TcpDiscoverySpi); + assertEquals(clientDiscovery(), ignite.configuration().getDiscoverySpi().isClientMode()); return ignite; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/838c0fd8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheClientModesTcpClientDiscoveryAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheClientModesTcpClientDiscoveryAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheClientModesTcpClientDiscoveryAbstractTest.java index 62dbb18..bd20ddc 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheClientModesTcpClientDiscoveryAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheClientModesTcpClientDiscoveryAbstractTest.java @@ -24,7 +24,7 @@ import org.apache.ignite.spi.discovery.tcp.*; import static org.apache.ignite.cache.CacheMode.*; /** - * Tests {@link TcpClientDiscoverySpi}. + * Tests {@link org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi} in client mode. */ @SuppressWarnings("RedundantMethodOverride") public abstract class GridCacheClientModesTcpClientDiscoveryAbstractTest extends GridCacheClientModesAbstractSelfTest { @@ -38,11 +38,9 @@ public abstract class GridCacheClientModesTcpClientDiscoveryAbstractTest extends IgniteConfiguration cfg = super.getConfiguration(gridName); if (cfg.isClientMode() != null && cfg.isClientMode()) { - TcpClientDiscoverySpi discoverySpi = new TcpClientDiscoverySpi(); - - discoverySpi.setIpFinder(ipFinder); - - cfg.setDiscoverySpi(discoverySpi); + cfg.setDiscoverySpi(new TcpDiscoverySpi() + .setClientMode(true) + .setIpFinder(ipFinder)); } return cfg; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/838c0fd8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAffinityFunctionExcludeNeighborsAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAffinityFunctionExcludeNeighborsAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAffinityFunctionExcludeNeighborsAbstractSelfTest.java index fcc15e0..e7daec3 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAffinityFunctionExcludeNeighborsAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAffinityFunctionExcludeNeighborsAbstractSelfTest.java @@ -55,7 +55,8 @@ public abstract class GridCacheAffinityFunctionExcludeNeighborsAbstractSelfTest // Override node attributes in discovery spi. TcpDiscoverySpi spi = new TcpDiscoverySpi() { - @Override public void setNodeAttributes(Map<String, Object> attrs, IgniteProductVersion ver) { + @Override public TcpDiscoverySpi setNodeAttributes(Map<String, Object> attrs, + IgniteProductVersion ver) { super.setNodeAttributes(attrs, ver); // Set unique mac addresses for every group of three nodes. @@ -64,6 +65,8 @@ public abstract class GridCacheAffinityFunctionExcludeNeighborsAbstractSelfTest attrs.put(IgniteNodeAttributes.ATTR_MACS, macAddrs); gridInstanceNum++; + + return this; } }; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/838c0fd8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheSyncReplicatedPreloadSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheSyncReplicatedPreloadSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheSyncReplicatedPreloadSelfTest.java index 9e6fc37..1a6146e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheSyncReplicatedPreloadSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheSyncReplicatedPreloadSelfTest.java @@ -58,7 +58,6 @@ public class GridCacheSyncReplicatedPreloadSelfTest extends GridCommonAbstractTe TcpDiscoverySpi disco = new TcpDiscoverySpi(); disco.setIpFinder(ipFinder); - disco.setDebugMode(DISCO_DEBUG_MODE); cfg.setDiscoverySpi(disco); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/838c0fd8/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoveryMarshallerCheckSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoveryMarshallerCheckSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoveryMarshallerCheckSelfTest.java index a2dee89..e9620ea 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoveryMarshallerCheckSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoveryMarshallerCheckSelfTest.java @@ -37,15 +37,12 @@ public class TcpClientDiscoveryMarshallerCheckSelfTest extends GridCommonAbstrac @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); - TcpDiscoverySpiAdapter discoSpi; - - if (gridName.endsWith("0")) { - discoSpi = new TcpDiscoverySpi(); + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + if (gridName.endsWith("0")) cfg.setMarshaller(new JdkMarshaller()); - } else { - discoSpi = new TcpClientDiscoverySpi(); + discoSpi.setClientMode(true); cfg.setClientMode(true); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/838c0fd8/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiConfigSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiConfigSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiConfigSelfTest.java deleted file mode 100644 index 3e8b231..0000000 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiConfigSelfTest.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.spi.discovery.tcp; - -import org.apache.ignite.testframework.junits.spi.*; - -/** - * - */ -@GridSpiTest(spi = TcpClientDiscoverySpi.class, group = "Discovery SPI") -public class TcpClientDiscoverySpiConfigSelfTest extends GridSpiAbstractConfigTest<TcpDiscoverySpi> { - /** - * @throws Exception If failed. - */ - public void testNegativeConfig() throws Exception { - checkNegativeSpiProperty(new TcpClientDiscoverySpi(), "ipFinder", null); - checkNegativeSpiProperty(new TcpClientDiscoverySpi(), "networkTimeout", 0); - checkNegativeSpiProperty(new TcpClientDiscoverySpi(), "socketTimeout", 0); - checkNegativeSpiProperty(new TcpClientDiscoverySpi(), "ackTimeout", 0); - checkNegativeSpiProperty(new TcpClientDiscoverySpi(), "heartbeatFrequency", 0); - checkNegativeSpiProperty(new TcpClientDiscoverySpi(), "threadPriority", -1); - checkNegativeSpiProperty(new TcpClientDiscoverySpi(), "joinTimeout", -1); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/838c0fd8/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java index 8157d59..1655c33 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java @@ -96,10 +96,10 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { private TcpDiscoveryVmIpFinder clientIpFinder; /** */ - private long joinTimeout = TcpClientDiscoverySpi.DFLT_JOIN_TIMEOUT; + private long joinTimeout = TcpDiscoverySpi.DFLT_JOIN_TIMEOUT; /** */ - private long netTimeout = TcpDiscoverySpiAdapter.DFLT_NETWORK_TIMEOUT; + private long netTimeout = TcpDiscoverySpi.DFLT_NETWORK_TIMEOUT; /** */ private boolean longSockTimeouts; @@ -108,7 +108,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); - TcpDiscoverySpiAdapter disco; + TcpDiscoverySpi disco; if (gridName.startsWith("server")) { disco = new TcpDiscoverySpi(); @@ -116,7 +116,9 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { disco.setIpFinder(IP_FINDER); } else if (gridName.startsWith("client")) { - disco = new TestTcpClientDiscovery(); + disco = new TestTcpDiscoverySpi(); + + disco.setClientMode(true); cfg.setClientMode(true); @@ -186,8 +188,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { nodeId = null; clientIpFinder = null; - joinTimeout = TcpClientDiscoverySpi.DFLT_JOIN_TIMEOUT; - netTimeout = TcpClientDiscoverySpi.DFLT_NETWORK_TIMEOUT; + joinTimeout = TcpDiscoverySpi.DFLT_JOIN_TIMEOUT; + netTimeout = TcpDiscoverySpi.DFLT_NETWORK_TIMEOUT; longSockTimeouts = false; assert G.allGrids().isEmpty(); @@ -408,14 +410,14 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { Ignite srv1 = G.ignite("server-1"); Ignite client = G.ignite("client-0"); - ((TcpDiscoverySpiAdapter)srv0.configuration().getDiscoverySpi()).setAckTimeout(1000); + ((TcpDiscoverySpi)srv0.configuration().getDiscoverySpi()).setAckTimeout(1000); - ((TestTcpClientDiscovery)client.configuration().getDiscoverySpi()).pauseSocketWrite(); + ((TestTcpDiscoverySpi)client.configuration().getDiscoverySpi()).pauseSocketWrite(); assert !((IgniteEx)srv1).context().discovery().pingNode(client.cluster().localNode().id()); assert !((IgniteEx)srv0).context().discovery().pingNode(client.cluster().localNode().id()); - ((TestTcpClientDiscovery)client.configuration().getDiscoverySpi()).resumeAll(); + ((TestTcpDiscoverySpi)client.configuration().getDiscoverySpi()).resumeAll(); assert ((IgniteEx)srv1).context().discovery().pingNode(client.cluster().localNode().id()); assert ((IgniteEx)srv0).context().discovery().pingNode(client.cluster().localNode().id()); @@ -465,7 +467,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { attachListeners(2, 3); - ((TcpClientDiscoverySpi)G.ignite("client-2").configuration().getDiscoverySpi()).brakeConnection(); + ((TcpDiscoverySpi)G.ignite("client-2").configuration().getDiscoverySpi()).brakeConnection(); G.ignite("client-2").message().remoteListen(null, new MessageListener()); // Send some discovery message. @@ -488,7 +490,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { attachListeners(2, 2); - ((TestTcpClientDiscovery)G.ignite("client-1").configuration().getDiscoverySpi()).pauseAll(); + ((TestTcpDiscoverySpi)G.ignite("client-1").configuration().getDiscoverySpi()).pauseAll(); stopGrid("server-2"); @@ -502,7 +504,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { clientLeftLatch = new CountDownLatch(1); - ((TestTcpClientDiscovery)G.ignite("client-1").configuration().getDiscoverySpi()).resumeAll(); + ((TestTcpDiscoverySpi)G.ignite("client-1").configuration().getDiscoverySpi()).resumeAll(); await(clientLeftLatch); @@ -535,7 +537,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { }; G.addListener(lsnr); - final TcpClientDiscoverySpi client2Disco = (TcpClientDiscoverySpi)G.ignite("client-2").configuration().getDiscoverySpi(); + final TcpDiscoverySpi disco = (TcpDiscoverySpi)G.ignite("client-2").configuration().getDiscoverySpi(); try { failServer(2); @@ -551,7 +553,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { G.removeListener(lsnr); } - assert client2Disco.getRemoteNodes().isEmpty(); + assert disco.getRemoteNodes().isEmpty(); } /** @@ -875,8 +877,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { * @throws Exception In case of error. */ private void setClientRouter(int clientIdx, int srvIdx) throws Exception { - TcpClientDiscoverySpi disco = - (TcpClientDiscoverySpi)G.ignite("client-" + clientIdx).configuration().getDiscoverySpi(); + TcpDiscoverySpi disco = + (TcpDiscoverySpi)G.ignite("client-" + clientIdx).configuration().getDiscoverySpi(); TcpDiscoveryVmIpFinder ipFinder = (TcpDiscoveryVmIpFinder)disco.getIpFinder(); @@ -923,7 +925,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { * @param idx Index. */ private void failClient(int idx) { - ((TcpClientDiscoverySpi)G.ignite("client-" + idx).configuration().getDiscoverySpi()).simulateNodeFailure(); + ((TcpDiscoverySpi)G.ignite("client-" + idx).configuration().getDiscoverySpi()).simulateNodeFailure(); } /** @@ -1034,7 +1036,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { for (int i = 0; i < clientCnt; i++) { Ignite g = G.ignite("client-" + i); - ((TcpClientDiscoverySpi)g.configuration().getDiscoverySpi()).waitForMessagePrecessed(); + ((TcpDiscoverySpi)g.configuration().getDiscoverySpi()).waitForClientMessagePrecessed(); assertTrue(clientNodeIds.contains(g.cluster().localNode().id())); @@ -1094,7 +1096,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { /** * */ - private static class TestTcpClientDiscovery extends TcpClientDiscoverySpi { + private static class TestTcpDiscoverySpi extends TcpDiscoverySpi { /** */ private final Object mux = new Object(); @@ -1162,7 +1164,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { public void pauseAll() { pauseResumeOperation(true, openSockLock, writeLock); - msgWorker.suspend(); + impl.workerThread().suspend(); } /** @@ -1171,7 +1173,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { public void resumeAll() { pauseResumeOperation(false, openSockLock, writeLock); - msgWorker.resume(); + impl.workerThread().resume(); } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/838c0fd8/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryConcurrentStartTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryConcurrentStartTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryConcurrentStartTest.java index d8512e3..c64b5a8 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryConcurrentStartTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryConcurrentStartTest.java @@ -44,7 +44,9 @@ public class TcpDiscoveryConcurrentStartTest extends GridCommonAbstractTest { @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); - TcpDiscoverySpiAdapter discoSpi = client ? new TcpClientDiscoverySpi() : new TcpDiscoverySpi(); + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setClientMode(client); discoSpi.setIpFinder(ipFinder); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/838c0fd8/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java index 6509a6d..8c287b1 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java @@ -71,13 +71,11 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest { IgniteConfiguration cfg = super.getConfiguration(gridName); if (client()) { - TcpClientDiscoverySpi spi = new TcpClientDiscoverySpi(); - - spi.setIpFinder(ipFinder); - cfg.setClientMode(true); - cfg.setDiscoverySpi(spi); + cfg.setDiscoverySpi(new TcpDiscoverySpi() + .setClientMode(true) + .setIpFinder(ipFinder)); } else { TcpDiscoverySpi spi = new TcpDiscoverySpi(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/838c0fd8/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java index 5648c31..ad12753 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java @@ -175,7 +175,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { } }, 4, "grid-starter"); - Collection<TcpDiscoveryNode> nodes = discoMap.get(g1.name()).ring().allNodes(); + Collection<TcpDiscoveryNode> nodes = ((ServerImpl)discoMap.get(g1.name()).impl).ring().allNodes(); ByteArrayOutputStream bos = new ByteArrayOutputStream(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/838c0fd8/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiConfigSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiConfigSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiConfigSelfTest.java index 3e895be..da9eed9 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiConfigSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiConfigSelfTest.java @@ -41,5 +41,13 @@ public class TcpDiscoverySpiConfigSelfTest extends GridSpiAbstractConfigTest<Tcp checkNegativeSpiProperty(new TcpDiscoverySpi(), "threadPriority", -1); checkNegativeSpiProperty(new TcpDiscoverySpi(), "maxMissedHeartbeats", 0); checkNegativeSpiProperty(new TcpDiscoverySpi(), "statisticsPrintFrequency", 0); + + checkNegativeSpiProperty(new TcpDiscoverySpi().setClientMode(true), "ipFinder", null); + checkNegativeSpiProperty(new TcpDiscoverySpi().setClientMode(true), "networkTimeout", 0); + checkNegativeSpiProperty(new TcpDiscoverySpi().setClientMode(true), "socketTimeout", 0); + checkNegativeSpiProperty(new TcpDiscoverySpi().setClientMode(true), "ackTimeout", 0); + checkNegativeSpiProperty(new TcpDiscoverySpi().setClientMode(true), "heartbeatFrequency", 0); + checkNegativeSpiProperty(new TcpDiscoverySpi().setClientMode(true), "threadPriority", -1); + checkNegativeSpiProperty(new TcpDiscoverySpi().setClientMode(true), "joinTimeout", -1); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/838c0fd8/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java index 2cae73a..9c42920 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java @@ -676,11 +676,11 @@ public abstract class GridAbstractTest extends TestCase { protected IgniteConfiguration optimize(IgniteConfiguration cfg) throws IgniteCheckedException { // TODO: IGNITE-605: propose another way to avoid network overhead in tests. if (cfg.getLocalHost() == null) { - if (cfg.getDiscoverySpi() instanceof TcpDiscoverySpiAdapter) { + if (cfg.getDiscoverySpi() instanceof TcpDiscoverySpi) { cfg.setLocalHost("127.0.0.1"); - if (((TcpDiscoverySpiAdapter)cfg.getDiscoverySpi()).getJoinTimeout() == 0) - ((TcpDiscoverySpiAdapter)cfg.getDiscoverySpi()).setJoinTimeout(8000); + if (((TcpDiscoverySpi)cfg.getDiscoverySpi()).getJoinTimeout() == 0) + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setJoinTimeout(8000); } else cfg.setLocalHost(getTestResources().getLocalHost()); @@ -740,7 +740,7 @@ public abstract class GridAbstractTest extends TestCase { Collection<Ignite> srvs = new ArrayList<>(); for (Ignite g : G.allGrids()) { - if (g.configuration().getDiscoverySpi() instanceof TcpClientDiscoverySpi) + if (g.configuration().getDiscoverySpi().isClientMode()) clients.add(g); else srvs.add(g); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/838c0fd8/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java index 8bf8dbc..dc35b24 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java @@ -52,7 +52,6 @@ public class IgniteSpiDiscoverySelfTestSuite extends TestSuite { suite.addTest(new TestSuite(GridTcpSpiForwardingSelfTest.class)); suite.addTest(new TestSuite(TcpClientDiscoverySpiSelfTest.class)); - suite.addTest(new TestSuite(TcpClientDiscoverySpiConfigSelfTest.class)); suite.addTest(new TestSuite(TcpClientDiscoveryMarshallerCheckSelfTest.class)); return suite;