http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java deleted file mode 100644 index 4196306..0000000 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java +++ /dev/null @@ -1,1264 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.spi.discovery.tcp; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.events.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.spi.*; -import org.apache.ignite.spi.discovery.*; -import org.apache.ignite.spi.discovery.tcp.internal.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; -import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.*; -import org.apache.ignite.spi.discovery.tcp.messages.*; -import org.jetbrains.annotations.*; -import org.jsr166.*; - -import java.io.*; -import java.net.*; -import java.util.*; -import java.util.concurrent.*; - -import static java.util.concurrent.TimeUnit.*; -import static org.apache.ignite.events.EventType.*; -import static org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHeartbeatMessage.*; - -/** - * Client discovery SPI implementation that uses TCP/IP for node discovery. - * <p> - * This discovery SPI requires at least on server node configured with - * {@link TcpDiscoverySpi}. It will try to connect to random IP taken from - * {@link TcpDiscoveryIpFinder} which should point to one of these server - * nodes and will maintain connection only with this node (will not enter the ring). - * If this connection is broken, it will try to reconnect using addresses from - * the same IP finder. - */ -@SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext") -@IgniteSpiMultipleInstancesSupport(true) -@DiscoverySpiOrderSupport(true) -@DiscoverySpiHistorySupport(true) -public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpClientDiscoverySpiMBean { - /** Default disconnect check interval. */ - public static final long DFLT_DISCONNECT_CHECK_INT = 2000; - - /** Remote nodes. */ - private final ConcurrentMap<UUID, TcpDiscoveryNode> rmtNodes = new ConcurrentHashMap8<>(); - - /** Socket. */ - private volatile Socket sock; - - /** Socket reader. */ - private volatile SocketReader sockRdr; - - /** Heartbeat sender. */ - private volatile HeartbeatSender hbSender; - - /** Disconnect handler. */ - private volatile DisconnectHandler disconnectHnd; - - /** Last message ID. */ - private volatile IgniteUuid lastMsgId; - - /** Current topology version. */ - private volatile long topVer; - - /** Join error. */ - private IgniteSpiException joinErr; - - /** Whether reconnect failed. */ - private boolean reconFailed; - - /** Joined latch. */ - private CountDownLatch joinLatch; - - /** Left latch. */ - private volatile CountDownLatch leaveLatch; - - /** Disconnect check interval. */ - private long disconnectCheckInt = DFLT_DISCONNECT_CHECK_INT; - - /** {@inheritDoc} */ - @Override public long getDisconnectCheckInterval() { - return disconnectCheckInt; - } - - /** - * Sets disconnect check interval. - * - * @param disconnectCheckInt Disconnect check interval. - */ - @IgniteSpiConfiguration(optional = true) - public void setDisconnectCheckInterval(long disconnectCheckInt) { - this.disconnectCheckInt = disconnectCheckInt; - } - - /** {@inheritDoc} */ - @Override public long getSocketTimeout() { - return sockTimeout; - } - - /** {@inheritDoc} */ - @Override public long getAckTimeout() { - return ackTimeout; - } - - /** {@inheritDoc} */ - @Override public long getNetworkTimeout() { - return netTimeout; - } - - /** {@inheritDoc} */ - @Override public int getThreadPriority() { - return threadPri; - } - - /** {@inheritDoc} */ - @Override public long getHeartbeatFrequency() { - return hbFreq; - } - - /** {@inheritDoc} */ - @Override public String getIpFinderFormatted() { - return ipFinder.toString(); - } - - /** {@inheritDoc} */ - @Override public int getMessageWorkerQueueSize() { - SocketReader sockRdr0 = sockRdr; - - return sockRdr0 != null ? sockRdr0.msgWrk.queueSize() : 0; - } - - /** {@inheritDoc} */ - @Override public long getNodesJoined() { - return stats.joinedNodesCount(); - } - - /** {@inheritDoc} */ - @Override public long getNodesLeft() { - return stats.leftNodesCount(); - } - - /** {@inheritDoc} */ - @Override public long getNodesFailed() { - return stats.failedNodesCount(); - } - - /** {@inheritDoc} */ - @Override public long getAvgMessageProcessingTime() { - return stats.avgMessageProcessingTime(); - } - - /** {@inheritDoc} */ - @Override public long getMaxMessageProcessingTime() { - return stats.maxMessageProcessingTime(); - } - - /** {@inheritDoc} */ - @Override public int getTotalReceivedMessages() { - return stats.totalReceivedMessages(); - } - - /** {@inheritDoc} */ - @Override public Map<String, Integer> getReceivedMessages() { - return stats.receivedMessages(); - } - - /** {@inheritDoc} */ - @Override public int getTotalProcessedMessages() { - return stats.totalProcessedMessages(); - } - - /** {@inheritDoc} */ - @Override public Map<String, Integer> getProcessedMessages() { - return stats.processedMessages(); - } - - /** {@inheritDoc} */ - @Override public void spiStart(@Nullable String gridName) throws IgniteSpiException { - startStopwatch(); - - assertParameter(ipFinder != null, "ipFinder != null"); - assertParameter(netTimeout > 0, "networkTimeout > 0"); - assertParameter(sockTimeout > 0, "sockTimeout > 0"); - assertParameter(ackTimeout > 0, "ackTimeout > 0"); - assertParameter(hbFreq > 0, "heartbeatFreq > 0"); - assertParameter(threadPri > 0, "threadPri > 0"); - - try { - locHost = U.resolveLocalHost(locAddr); - } - catch (IOException e) { - throw new IgniteSpiException("Unknown local address: " + locAddr, e); - } - - if (log.isDebugEnabled()) { - log.debug(configInfo("localHost", locHost.getHostAddress())); - log.debug(configInfo("threadPri", threadPri)); - log.debug(configInfo("networkTimeout", netTimeout)); - log.debug(configInfo("sockTimeout", sockTimeout)); - log.debug(configInfo("ackTimeout", ackTimeout)); - log.debug(configInfo("ipFinder", ipFinder)); - log.debug(configInfo("heartbeatFreq", hbFreq)); - } - - // Warn on odd network timeout. - if (netTimeout < 3000) - U.warn(log, "Network timeout is too low (at least 3000 ms recommended): " + netTimeout); - - registerMBean(gridName, this, TcpClientDiscoverySpiMBean.class); - - try { - locHost = U.resolveLocalHost(locAddr); - } - catch (IOException e) { - throw new IgniteSpiException("Unknown local address: " + locAddr, e); - } - - if (ipFinder instanceof TcpDiscoveryMulticastIpFinder) { - TcpDiscoveryMulticastIpFinder mcastIpFinder = ((TcpDiscoveryMulticastIpFinder)ipFinder); - - if (mcastIpFinder.getLocalAddress() == null) - mcastIpFinder.setLocalAddress(locAddr); - } - - IgniteBiTuple<Collection<String>, Collection<String>> addrs; - - try { - addrs = U.resolveLocalAddresses(locHost); - } - catch (IOException | IgniteCheckedException e) { - throw new IgniteSpiException("Failed to resolve local host to set of external addresses: " + locHost, e); - } - - locNode = new TcpDiscoveryNode( - getLocalNodeId(), - addrs.get1(), - addrs.get2(), - 0, - metricsProvider, - locNodeVer); - - locNode.setAttributes(locNodeAttrs); - locNode.local(true); - - sockTimeoutWorker = new SocketTimeoutWorker(); - sockTimeoutWorker.start(); - - joinTopology(false); - - disconnectHnd = new DisconnectHandler(); - disconnectHnd.start(); - - if (log.isDebugEnabled()) - log.debug(startInfo()); - } - - /** {@inheritDoc} */ - @Override public void spiStop() throws IgniteSpiException { - rmtNodes.clear(); - - U.interrupt(disconnectHnd); - U.join(disconnectHnd, log); - - U.interrupt(hbSender); - U.join(hbSender, log); - - Socket sock0 = sock; - - sock = null; - - if (sock0 != null) { - leaveLatch = new CountDownLatch(1); - - try { - TcpDiscoveryNodeLeftMessage msg = new TcpDiscoveryNodeLeftMessage(getLocalNodeId()); - - msg.client(true); - - writeToSocket(sock0, msg); - - if (!U.await(leaveLatch, netTimeout, MILLISECONDS)) { - if (log.isDebugEnabled()) - log.debug("Did not receive node left message for local node (will stop anyway) [sock=" + - sock0 + ']'); - } - } - catch (IOException | IgniteCheckedException e) { - if (log.isDebugEnabled()) - U.error(log, "Failed to send node left message (will stop anyway) [sock=" + sock0 + ']', e); - } - finally { - U.closeQuiet(sock0); - } - } - - U.interrupt(sockRdr); - U.join(sockRdr, log); - - U.interrupt(sockTimeoutWorker); - U.join(sockTimeoutWorker, log); - - unregisterMBean(); - - if (log.isDebugEnabled()) - log.debug(stopInfo()); - } - - /** {@inheritDoc} */ - @Override public Collection<Object> injectables() { - return Arrays.<Object>asList(ipFinder); - } - - /** {@inheritDoc} */ - @Override public Collection<ClusterNode> getRemoteNodes() { - return F.view(U.<TcpDiscoveryNode, ClusterNode>arrayList(rmtNodes.values(), new P1<TcpDiscoveryNode>() { - @Override public boolean apply(TcpDiscoveryNode node) { - return node.visible(); - } - })); - } - - /** {@inheritDoc} */ - @Nullable @Override public ClusterNode getNode(UUID nodeId) { - if (getLocalNodeId().equals(nodeId)) - return locNode; - - TcpDiscoveryNode node = rmtNodes.get(nodeId); - - return node != null && node.visible() ? node : null; - } - - /** {@inheritDoc} */ - @Override public boolean pingNode(UUID nodeId) { - assert nodeId != null; - - if (nodeId.equals(getLocalNodeId())) - return true; - - TcpDiscoveryNode node = rmtNodes.get(nodeId); - - return node != null && node.visible(); - } - - /** {@inheritDoc} */ - @Override public void disconnect() throws IgniteSpiException { - throw new UnsupportedOperationException(); - } - - /** {@inheritDoc} */ - @Override public void setAuthenticator(DiscoverySpiNodeAuthenticator auth) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void sendCustomEvent(Serializable evt) { - throw new UnsupportedOperationException(); - } - - /** {@inheritDoc} */ - @Override public void failNode(UUID nodeId) { - ClusterNode node = rmtNodes.get(nodeId); - - if (node != null) { - TcpDiscoveryNodeFailedMessage msg = new TcpDiscoveryNodeFailedMessage(getLocalNodeId(), - node.id(), node.order()); - - sockRdr.addMessage(msg); - } - } - - /** - * @param recon Reconnect flag. - * @return Whether joined successfully. - * @throws IgniteSpiException In case of error. - */ - private boolean joinTopology(boolean recon) throws IgniteSpiException { - if (!recon) - stats.onJoinStarted(); - - Collection<InetSocketAddress> addrs = null; - - while (!Thread.currentThread().isInterrupted()) { - try { - while (addrs == null || addrs.isEmpty()) { - addrs = resolvedAddresses(); - - if (!F.isEmpty(addrs)) { - if (log.isDebugEnabled()) - log.debug("Resolved addresses from IP finder: " + addrs); - } - else { - U.warn(log, "No addresses registered in the IP finder (will retry in 2000ms): " + ipFinder); - - U.sleep(2000); - } - } - - Collection<InetSocketAddress> addrs0 = new ArrayList<>(addrs); - - Iterator<InetSocketAddress> it = addrs.iterator(); - - while (it.hasNext() && !Thread.currentThread().isInterrupted()) { - InetSocketAddress addr = it.next(); - - Socket sock = null; - - try { - long ts = U.currentTimeMillis(); - - IgniteBiTuple<Socket, UUID> t = initConnection(addr); - - sock = t.get1(); - - UUID rmtNodeId = t.get2(); - - stats.onClientSocketInitialized(U.currentTimeMillis() - ts); - - locNode.clientRouterNodeId(rmtNodeId); - - TcpDiscoveryAbstractMessage msg = recon ? - new TcpDiscoveryClientReconnectMessage(getLocalNodeId(), rmtNodeId, - lastMsgId) : - new TcpDiscoveryJoinRequestMessage(locNode, null); - - msg.client(true); - - writeToSocket(sock, msg); - - int res = readReceipt(sock, ackTimeout); - - switch (res) { - case RES_OK: - this.sock = sock; - - sockRdr = new SocketReader(rmtNodeId, new MessageWorker(recon)); - sockRdr.start(); - - if (U.await(joinLatch, netTimeout, MILLISECONDS)) { - IgniteSpiException joinErr0 = joinErr; - - if (joinErr0 != null) - throw joinErr0; - - if (reconFailed) { - if (log.isDebugEnabled()) - log.debug("Failed to reconnect, will try to rejoin [locNode=" + - locNode + ']'); - - U.closeQuiet(sock); - - U.interrupt(sockRdr); - U.join(sockRdr, log); - - this.sock = null; - - return false; - } - - if (log.isDebugEnabled()) - log.debug("Successfully connected to topology [sock=" + sock + ']'); - - hbSender = new HeartbeatSender(); - hbSender.start(); - - stats.onJoinFinished(); - - return true; - } - else { - U.warn(log, "Join process timed out (will try other address) [sock=" + sock + - ", timeout=" + netTimeout + ']'); - - U.closeQuiet(sock); - - U.interrupt(sockRdr); - U.join(sockRdr, log); - - it.remove(); - - break; - } - - case RES_CONTINUE_JOIN: - case RES_WAIT: - U.closeQuiet(sock); - - break; - - default: - if (log.isDebugEnabled()) - log.debug("Received unexpected response to join request: " + res); - - U.closeQuiet(sock); - } - } - catch (IgniteInterruptedCheckedException ignored) { - if (log.isDebugEnabled()) - log.debug("Joining thread was interrupted."); - - return false; - } - catch (IOException | IgniteCheckedException e) { - if (log.isDebugEnabled()) - U.error(log, "Failed to establish connection with address: " + addr, e); - - U.closeQuiet(sock); - - it.remove(); - } - } - - if (addrs.isEmpty()) { - U.warn(log, "Failed to connect to any address from IP finder (will retry to join topology " + - "in 2000ms): " + addrs0); - - U.sleep(2000); - } - } - catch (IgniteInterruptedCheckedException ignored) { - if (log.isDebugEnabled()) - log.debug("Joining thread was interrupted."); - } - } - - return false; - } - - /** - * @param addr Address. - * @return Remote node ID. - * @throws IOException In case of I/O error. - * @throws IgniteCheckedException In case of other error. - */ - private IgniteBiTuple<Socket, UUID> initConnection(InetSocketAddress addr) throws IOException, IgniteCheckedException { - assert addr != null; - - joinLatch = new CountDownLatch(1); - - Socket sock = openSocket(addr); - - TcpDiscoveryHandshakeRequest req = new TcpDiscoveryHandshakeRequest(getLocalNodeId()); - - req.client(true); - - writeToSocket(sock, req); - - TcpDiscoveryHandshakeResponse res = readMessage(sock, null, ackTimeout); - - UUID nodeId = res.creatorNodeId(); - - assert nodeId != null; - assert !getLocalNodeId().equals(nodeId); - - return F.t(sock, nodeId); - } - - /** - * FOR TEST PURPOSE ONLY! - */ - void simulateNodeFailure() { - U.warn(log, "Simulating client node failure: " + getLocalNodeId()); - - U.closeQuiet(sock); - - U.interrupt(disconnectHnd); - U.join(disconnectHnd, log); - - U.interrupt(hbSender); - U.join(hbSender, log); - - U.interrupt(sockRdr); - U.join(sockRdr, log); - - U.interrupt(sockTimeoutWorker); - U.join(sockTimeoutWorker, log); - } - - /** - * Disconnect handler. - */ - private class DisconnectHandler extends IgniteSpiThread { - /** - */ - protected DisconnectHandler() { - super(gridName, "tcp-client-disco-disconnect-hnd", log); - } - - /** {@inheritDoc} */ - @Override protected void body() throws InterruptedException { - while (!isInterrupted()) { - try { - U.sleep(disconnectCheckInt); - - if (sock == null) { - if (log.isDebugEnabled()) - log.debug("Node is disconnected from topology, will try to reconnect."); - - U.interrupt(hbSender); - U.join(hbSender, log); - - U.interrupt(sockRdr); - U.join(sockRdr, log); - - // If reconnection fails, try to rejoin. - if (!joinTopology(true)) { - rmtNodes.clear(); - - locNode.order(0); - - joinTopology(false); - - getSpiContext().recordEvent(new DiscoveryEvent(locNode, - "Client node reconnected: " + locNode, - EVT_CLIENT_NODE_RECONNECTED, locNode)); - } - } - } - catch (IgniteInterruptedCheckedException ignored) { - if (log.isDebugEnabled()) - log.debug("Disconnect handler was interrupted."); - - return; - } - catch (IgniteSpiException e) { - U.error(log, "Failed to reconnect to topology after failure.", e); - } - } - } - } - - /** - * Heartbeat sender. - */ - private class HeartbeatSender extends IgniteSpiThread { - /** - */ - protected HeartbeatSender() { - super(gridName, "tcp-client-disco-heartbeat-sender", log); - } - - /** {@inheritDoc} */ - @Override protected void body() throws InterruptedException { - Socket sock0 = sock; - - if (sock0 == null) { - if (log.isDebugEnabled()) - log.debug("Failed to start heartbeat sender, node is already disconnected."); - - return; - } - - try { - while (!isInterrupted()) { - U.sleep(hbFreq); - - TcpDiscoveryHeartbeatMessage msg = new TcpDiscoveryHeartbeatMessage(getLocalNodeId()); - - msg.client(true); - - sockRdr.addMessage(msg); - } - } - catch (IgniteInterruptedCheckedException ignored) { - if (log.isDebugEnabled()) - log.debug("Heartbeat sender was interrupted."); - } - } - } - - /** - * Socket reader. - */ - private class SocketReader extends IgniteSpiThread { - /** Remote node ID. */ - private final UUID nodeId; - - /** Message worker. */ - private final MessageWorker msgWrk; - - /** - * @param nodeId Node ID. - * @param msgWrk Message worker. - */ - protected SocketReader(UUID nodeId, MessageWorker msgWrk) { - super(gridName, "tcp-client-disco-sock-reader", log); - - assert nodeId != null; - assert msgWrk != null; - - this.nodeId = nodeId; - this.msgWrk = msgWrk; - } - - /** {@inheritDoc} */ - @Override public synchronized void start() { - super.start(); - - msgWrk.start(); - } - - /** {@inheritDoc} */ - @Override protected void body() throws InterruptedException { - Socket sock0 = sock; - - if (sock0 == null) { - if (log.isDebugEnabled()) - log.debug("Failed to start socket reader, node is already disconnected."); - - return; - } - - try { - InputStream in = new BufferedInputStream(sock0.getInputStream()); - - sock0.setKeepAlive(true); - sock0.setTcpNoDelay(true); - - while (!isInterrupted()) { - try { - TcpDiscoveryAbstractMessage msg = marsh.unmarshal(in, U.gridClassLoader()); - - msg.senderNodeId(nodeId); - - if (log.isDebugEnabled()) - log.debug("Message has been received: " + msg); - - stats.onMessageReceived(msg); - - if (joinLatch.getCount() > 0) { - IgniteSpiException err = null; - - if (msg instanceof TcpDiscoveryDuplicateIdMessage) - err = duplicateIdError((TcpDiscoveryDuplicateIdMessage)msg); - else if (msg instanceof TcpDiscoveryAuthFailedMessage) - err = authenticationFailedError((TcpDiscoveryAuthFailedMessage)msg); - else if (msg instanceof TcpDiscoveryCheckFailedMessage) - err = checkFailedError((TcpDiscoveryCheckFailedMessage)msg); - - if (err != null) { - joinErr = err; - - joinLatch.countDown(); - - return; - } - } - - msgWrk.addMessage(msg); - } - catch (IgniteCheckedException e) { - if (log.isDebugEnabled()) - U.error(log, "Failed to read message [sock=" + sock0 + ", " + - "locNodeId=" + getLocalNodeId() + ", rmtNodeId=" + nodeId + ']', e); - - IOException ioEx = X.cause(e, IOException.class); - - if (ioEx != null) - throw ioEx; - - ClassNotFoundException clsNotFoundEx = X.cause(e, ClassNotFoundException.class); - - if (clsNotFoundEx != null) - LT.warn(log, null, "Failed to read message due to ClassNotFoundException " + - "(make sure same versions of all classes are available on all nodes) " + - "[rmtNodeId=" + nodeId + ", err=" + clsNotFoundEx.getMessage() + ']'); - else - LT.error(log, e, "Failed to read message [sock=" + sock0 + ", locNodeId=" + - getLocalNodeId() + ", rmtNodeId=" + nodeId + ']'); - } - } - } - catch (IOException e) { - if (log.isDebugEnabled()) - U.error(log, "Connection failed [sock=" + sock0 + ", locNodeId=" + - getLocalNodeId() + ", rmtNodeId=" + nodeId + ']', e); - } - finally { - U.closeQuiet(sock0); - - U.interrupt(msgWrk); - - try { - U.join(msgWrk); - } - catch (IgniteInterruptedCheckedException ignored) { - // No-op. - } - - sock = null; - } - } - - /** - * @param msg Message. - */ - void addMessage(TcpDiscoveryAbstractMessage msg) { - assert msg != null; - - msgWrk.addMessage(msg); - } - } - - /** - * Message worker. - */ - private class MessageWorker extends MessageWorkerAdapter { - /** Topology history. */ - private final NavigableMap<Long, Collection<ClusterNode>> topHist = new TreeMap<>(); - - /** Indicates that reconnection is in progress. */ - private boolean recon; - - /** Indicates that pending messages are currently processed. */ - private boolean pending; - - /** - * @param recon Whether reconnection is in progress. - */ - protected MessageWorker(boolean recon) { - super("tcp-client-disco-msg-worker"); - - this.recon = recon; - } - - /** {@inheritDoc} */ - @Override protected void processMessage(TcpDiscoveryAbstractMessage msg) { - assert msg != null; - assert msg.verified() || msg.senderNodeId() == null; - - stats.onMessageProcessingStarted(msg); - - if (msg instanceof TcpDiscoveryClientReconnectMessage) - processClientReconnectMessage((TcpDiscoveryClientReconnectMessage)msg); - else { - if (recon && !pending) { - if (log.isDebugEnabled()) - log.debug("Discarding message received during reconnection: " + msg); - } - else { - if (msg instanceof TcpDiscoveryNodeAddedMessage) - processNodeAddedMessage((TcpDiscoveryNodeAddedMessage)msg); - else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage) - processNodeAddFinishedMessage((TcpDiscoveryNodeAddFinishedMessage)msg); - else if (msg instanceof TcpDiscoveryNodeLeftMessage) - processNodeLeftMessage((TcpDiscoveryNodeLeftMessage)msg); - else if (msg instanceof TcpDiscoveryNodeFailedMessage) - processNodeFailedMessage((TcpDiscoveryNodeFailedMessage)msg); - else if (msg instanceof TcpDiscoveryHeartbeatMessage) - processHeartbeatMessage((TcpDiscoveryHeartbeatMessage)msg); - - if (ensured(msg)) - lastMsgId = msg.id(); - } - } - - stats.onMessageProcessingFinished(msg); - } - - /** - * @param msg Message. - */ - private void processNodeAddedMessage(TcpDiscoveryNodeAddedMessage msg) { - if (leaveLatch != null) - return; - - TcpDiscoveryNode node = msg.node(); - - UUID newNodeId = node.id(); - - if (getLocalNodeId().equals(newNodeId)) { - if (joinLatch.getCount() > 0) { - Collection<TcpDiscoveryNode> top = msg.topology(); - - if (top != null) { - gridStartTime = msg.gridStartTime(); - - for (TcpDiscoveryNode n : top) { - if (n.order() > 0) - n.visible(true); - - rmtNodes.put(n.id(), n); - } - - topHist.clear(); - - if (msg.topologyHistory() != null) - topHist.putAll(msg.topologyHistory()); - - Map<UUID, Map<Integer, byte[]>> dataMap = msg.oldNodesDiscoveryData(); - - if (dataMap != null) { - for (Map.Entry<UUID, Map<Integer, byte[]>> entry : dataMap.entrySet()) - onExchange(newNodeId, entry.getKey(), entry.getValue(), null); - } - - locNode.setAttributes(node.attributes()); - locNode.visible(true); - } - else if (log.isDebugEnabled()) - log.debug("Discarding node added message with empty topology: " + msg); - } - else if (log.isDebugEnabled()) - log.debug("Discarding node added message (this message has already been processed) " + - "[msg=" + msg + ", locNode=" + locNode + ']'); - } - else { - boolean topChanged = rmtNodes.putIfAbsent(newNodeId, node) == null; - - if (topChanged) { - if (log.isDebugEnabled()) - log.debug("Added new node to topology: " + node); - - Map<Integer, byte[]> data = msg.newNodeDiscoveryData(); - - if (data != null) - onExchange(newNodeId, newNodeId, data, null); - } - } - } - - /** - * @param msg Message. - */ - private void processNodeAddFinishedMessage(TcpDiscoveryNodeAddFinishedMessage msg) { - if (leaveLatch != null) - return; - - if (getLocalNodeId().equals(msg.nodeId())) { - if (joinLatch.getCount() > 0) { - long topVer = msg.topologyVersion(); - - locNode.order(topVer); - - notifyDiscovery(EVT_NODE_JOINED, topVer, locNode, updateTopologyHistory(topVer)); - - joinErr = null; - - joinLatch.countDown(); - } - else if (log.isDebugEnabled()) - log.debug("Discarding node add finished message (this message has already been processed) " + - "[msg=" + msg + ", locNode=" + locNode + ']'); - } - else { - TcpDiscoveryNode node = rmtNodes.get(msg.nodeId()); - - if (node == null) { - if (log.isDebugEnabled()) - log.debug("Discarding node add finished message since node is not found [msg=" + msg + ']'); - - return; - } - - long topVer = msg.topologyVersion(); - - node.order(topVer); - node.visible(true); - - if (locNodeVer.equals(node.version())) - node.version(locNodeVer); - - Collection<ClusterNode> top = updateTopologyHistory(topVer); - - if (!pending && joinLatch.getCount() > 0) { - if (log.isDebugEnabled()) - log.debug("Discarding node add finished message (join process is not finished): " + msg); - - return; - } - - notifyDiscovery(EVT_NODE_JOINED, topVer, node, top); - - stats.onNodeJoined(); - } - } - - /** - * @param msg Message. - */ - private void processNodeLeftMessage(TcpDiscoveryNodeLeftMessage msg) { - if (getLocalNodeId().equals(msg.creatorNodeId())) { - if (log.isDebugEnabled()) - log.debug("Received node left message for local node: " + msg); - - CountDownLatch leaveLatch0 = leaveLatch; - - assert leaveLatch0 != null; - - leaveLatch0.countDown(); - } - else { - if (leaveLatch != null) - return; - - TcpDiscoveryNode node = rmtNodes.remove(msg.creatorNodeId()); - - if (node == null) { - if (log.isDebugEnabled()) - log.debug("Discarding node left message since node is not found [msg=" + msg + ']'); - - return; - } - - Collection<ClusterNode> top = updateTopologyHistory(msg.topologyVersion()); - - if (!pending && joinLatch.getCount() > 0) { - if (log.isDebugEnabled()) - log.debug("Discarding node left message (join process is not finished): " + msg); - - return; - } - - notifyDiscovery(EVT_NODE_LEFT, msg.topologyVersion(), node, top); - - stats.onNodeLeft(); - } - } - - /** - * @param msg Message. - */ - private void processNodeFailedMessage(TcpDiscoveryNodeFailedMessage msg) { - if (leaveLatch != null) - return; - - if (!getLocalNodeId().equals(msg.creatorNodeId())) { - TcpDiscoveryNode node = rmtNodes.remove(msg.failedNodeId()); - - if (node == null) { - if (log.isDebugEnabled()) - log.debug("Discarding node failed message since node is not found [msg=" + msg + ']'); - - return; - } - - Collection<ClusterNode> top = updateTopologyHistory(msg.topologyVersion()); - - if (!pending && joinLatch.getCount() > 0) { - if (log.isDebugEnabled()) - log.debug("Discarding node failed message (join process is not finished): " + msg); - - return; - } - - notifyDiscovery(EVT_NODE_FAILED, msg.topologyVersion(), node, top); - - stats.onNodeFailed(); - } - } - - /** - * @param msg Message. - */ - private void processHeartbeatMessage(TcpDiscoveryHeartbeatMessage msg) { - if (leaveLatch != null) - return; - - if (getLocalNodeId().equals(msg.creatorNodeId())) { - if (msg.senderNodeId() == null) { - Socket sock0 = sock; - - if (sock0 != null) { - UUID nodeId = ignite.configuration().getNodeId(); - - msg.setMetrics(nodeId, metricsProvider.metrics()); - - msg.setCacheMetrics(nodeId, metricsProvider.cacheMetrics()); - - try { - writeToSocket(sock0, msg); - - if (log.isDebugEnabled()) - log.debug("Heartbeat message sent [sock=" + sock0 + ", msg=" + msg + ']'); - } - catch (IOException | IgniteCheckedException e) { - if (log.isDebugEnabled()) - U.error(log, "Failed to send heartbeat message [sock=" + sock0 + - ", msg=" + msg + ']', e); - - U.closeQuiet(sock0); - - sock = null; - - interrupt(); - } - } - else if (log.isDebugEnabled()) - log.debug("Failed to send heartbeat message (node is disconnected): " + msg); - } - else if (log.isDebugEnabled()) - log.debug("Received heartbeat response: " + msg); - } - else { - long tstamp = U.currentTimeMillis(); - - if (msg.hasMetrics()) { - for (Map.Entry<UUID, MetricsSet> e : msg.metrics().entrySet()) { - UUID nodeId = e.getKey(); - - MetricsSet metricsSet = e.getValue(); - - Map<Integer, CacheMetrics> cacheMetrics = msg.hasCacheMetrics() ? - msg.cacheMetrics().get(nodeId) : Collections.<Integer, CacheMetrics>emptyMap(); - - updateMetrics(nodeId, metricsSet.metrics(), cacheMetrics, tstamp); - - for (T2<UUID, ClusterMetrics> t : metricsSet.clientMetrics()) - updateMetrics(t.get1(), t.get2(), cacheMetrics, tstamp); - } - } - } - } - - /** - * @param msg Message. - */ - private void processClientReconnectMessage(TcpDiscoveryClientReconnectMessage msg) { - if (leaveLatch != null) - return; - - if (getLocalNodeId().equals(msg.creatorNodeId())) { - if (msg.success()) { - pending = true; - - try { - for (TcpDiscoveryAbstractMessage pendingMsg : msg.pendingMessages()) - processMessage(pendingMsg); - } - finally { - pending = false; - } - - joinErr = null; - reconFailed = false; - - joinLatch.countDown(); - } - else { - joinErr = null; - reconFailed = true; - - getSpiContext().recordEvent(new DiscoveryEvent(locNode, - "Client node disconnected: " + locNode, - EVT_CLIENT_NODE_DISCONNECTED, locNode)); - - joinLatch.countDown(); - } - } - else if (log.isDebugEnabled()) - log.debug("Discarding reconnect message for another client: " + msg); - } - - /** - * @param nodeId Node ID. - * @param metrics Metrics. - * @param cacheMetrics Cache metrics. - * @param tstamp Timestamp. - */ - private void updateMetrics(UUID nodeId, - ClusterMetrics metrics, - Map<Integer, CacheMetrics> cacheMetrics, - long tstamp) - { - assert nodeId != null; - assert metrics != null; - assert cacheMetrics != null; - - TcpDiscoveryNode node = nodeId.equals(getLocalNodeId()) ? locNode : rmtNodes.get(nodeId); - - if (node != null && node.visible()) { - node.setMetrics(metrics); - node.setCacheMetrics(cacheMetrics); - - node.lastUpdateTime(tstamp); - - notifyDiscovery(EVT_NODE_METRICS_UPDATED, topVer, node, allNodes()); - } - else if (log.isDebugEnabled()) - log.debug("Received metrics from unknown node: " + nodeId); - } - - /** - * @param topVer New topology version. - * @return Latest topology snapshot. - */ - private Collection<ClusterNode> updateTopologyHistory(long topVer) { - TcpClientDiscoverySpi.this.topVer = topVer; - - Collection<ClusterNode> allNodes = allNodes(); - - if (!topHist.containsKey(topVer)) { - assert topHist.isEmpty() || topHist.lastKey() == topVer - 1 : - "lastVer=" + topHist.lastKey() + ", newVer=" + topVer; - - topHist.put(topVer, allNodes); - - if (topHist.size() > topHistSize) - topHist.pollFirstEntry(); - - assert topHist.lastKey() == topVer; - assert topHist.size() <= topHistSize; - } - - return allNodes; - } - - /** - * @return All nodes. - */ - private Collection<ClusterNode> allNodes() { - Collection<ClusterNode> allNodes = new TreeSet<>(); - - for (TcpDiscoveryNode node : rmtNodes.values()) { - if (node.visible()) - allNodes.add(node); - } - - allNodes.add(locNode); - - return allNodes; - } - - /** - * @param type Event type. - * @param topVer Topology version. - * @param node Node. - * @param top Topology snapshot. - */ - private void notifyDiscovery(int type, long topVer, ClusterNode node, Collection<ClusterNode> top) { - DiscoverySpiListener lsnr = TcpClientDiscoverySpi.this.lsnr; - - if (lsnr != null) { - if (log.isDebugEnabled()) - log.debug("Discovery notification [node=" + node + ", type=" + U.gridEventName(type) + - ", topVer=" + topVer + ']'); - - lsnr.onDiscovery(type, topVer, node, top, new TreeMap<>(topHist), null); - } - else if (log.isDebugEnabled()) - log.debug("Skipped discovery notification [node=" + node + ", type=" + U.gridEventName(type) + - ", topVer=" + topVer + ']'); - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiMBean.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiMBean.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiMBean.java deleted file mode 100644 index 9fe4adc..0000000 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiMBean.java +++ /dev/null @@ -1,164 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.spi.discovery.tcp; - -import org.apache.ignite.mxbean.*; -import org.apache.ignite.spi.*; - -import java.util.*; - -/** - * Management bean for {@link TcpClientDiscoverySpi}. - */ -public interface TcpClientDiscoverySpiMBean extends IgniteSpiManagementMBean { - /** - * Gets disconnect check interval. - * - * @return Disconnect check interval. - */ - @MXBeanDescription("Disconnect check interval.") - public long getDisconnectCheckInterval(); - - /** - * Gets socket timeout. - * - * @return Socket timeout. - */ - @MXBeanDescription("Socket timeout.") - public long getSocketTimeout(); - - /** - * Gets message acknowledgement timeout. - * - * @return Message acknowledgement timeout. - */ - @MXBeanDescription("Message acknowledgement timeout.") - public long getAckTimeout(); - - /** - * Gets network timeout. - * - * @return Network timeout. - */ - @MXBeanDescription("Network timeout.") - public long getNetworkTimeout(); - - /** - * Gets thread priority. All threads within SPI will be started with it. - * - * @return Thread priority. - */ - @MXBeanDescription("Threads priority.") - public int getThreadPriority(); - - /** - * Gets delay between heartbeat messages sent by coordinator. - * - * @return Time period in milliseconds. - */ - @MXBeanDescription("Heartbeat frequency.") - public long getHeartbeatFrequency(); - - /** - * Gets {@link org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder} (string representation). - * - * @return IPFinder (string representation). - */ - @MXBeanDescription("IP Finder.") - public String getIpFinderFormatted(); - - /** - * Gets message worker queue current size. - * - * @return Message worker queue current size. - */ - @MXBeanDescription("Message worker queue current size.") - public int getMessageWorkerQueueSize(); - - /** - * Gets joined nodes count. - * - * @return Nodes joined count. - */ - @MXBeanDescription("Nodes joined count.") - public long getNodesJoined(); - - /** - * Gets left nodes count. - * - * @return Left nodes count. - */ - @MXBeanDescription("Nodes left count.") - public long getNodesLeft(); - - /** - * Gets failed nodes count. - * - * @return Failed nodes count. - */ - @MXBeanDescription("Nodes failed count.") - public long getNodesFailed(); - - /** - * Gets avg message processing time. - * - * @return Avg message processing time. - */ - @MXBeanDescription("Avg message processing time.") - public long getAvgMessageProcessingTime(); - - /** - * Gets max message processing time. - * - * @return Max message processing time. - */ - @MXBeanDescription("Max message processing time.") - public long getMaxMessageProcessingTime(); - - /** - * Gets total received messages count. - * - * @return Total received messages count. - */ - @MXBeanDescription("Total received messages count.") - public int getTotalReceivedMessages(); - - /** - * Gets received messages counts (grouped by type). - * - * @return Map containing message types and respective counts. - */ - @MXBeanDescription("Received messages by type.") - public Map<String, Integer> getReceivedMessages(); - - /** - * Gets total processed messages count. - * - * @return Total processed messages count. - */ - @MXBeanDescription("Total processed messages count.") - public int getTotalProcessedMessages(); - - /** - * Gets processed messages counts (grouped by type). - * - * @return Map containing message types and respective counts. - */ - @MXBeanDescription("Received messages by type.") - public Map<String, Integer> getProcessedMessages(); -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java new file mode 100644 index 0000000..b7e9e53 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.spi.*; +import org.apache.ignite.spi.discovery.*; +import org.apache.ignite.spi.discovery.tcp.internal.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +/** + * + */ +abstract class TcpDiscoveryImpl { + /** Response OK. */ + protected static final int RES_OK = 1; + + /** Response CONTINUE JOIN. */ + protected static final int RES_CONTINUE_JOIN = 100; + + /** Response WAIT. */ + protected static final int RES_WAIT = 200; + + /** */ + protected final TcpDiscoverySpi spi; + + /** */ + protected final IgniteLogger log; + + /** */ + protected TcpDiscoveryNode locNode; + + /** + * @param spi Adapter. + */ + TcpDiscoveryImpl(TcpDiscoverySpi spi) { + this.spi = spi; + + log = spi.log; + } + + /** + * @return Local node ID. + */ + public UUID getLocalNodeId() { + return spi.getLocalNodeId(); + } + + /** + * @param msg Error message. + * @param e Exception. + */ + protected void onException(String msg, Exception e){ + spi.getExceptionRegistry().onException(msg, e); + } + + /** + * @param log Logger. + */ + public abstract void dumpDebugInfo(IgniteLogger log); + + /** + * @return SPI state string. + */ + public abstract String getSpiState(); + + /** + * @return Message worker queue current size. + */ + public abstract int getMessageWorkerQueueSize(); + + /** + * @return Coordinator ID. + */ + public abstract UUID getCoordinator(); + + /** + * @return Collection of remote nodes. + */ + public abstract Collection<ClusterNode> getRemoteNodes(); + + /** + * @param nodeId Node id. + * @return Node with given ID or {@code null} if node is not found. + */ + @Nullable public abstract ClusterNode getNode(UUID nodeId); + + /** + * @param nodeId Node id. + * @return {@code true} if node alive, {@code false} otherwise. + */ + public abstract boolean pingNode(UUID nodeId); + + /** + * Tells discovery SPI to disconnect from topology. + * + * @throws IgniteSpiException If failed. + */ + public abstract void disconnect() throws IgniteSpiException; + + /** + * @param msg Message. + * @throws IgniteException If failed. + */ + public abstract void sendCustomEvent(DiscoverySpiCustomMessage msg) throws IgniteException; + + /** + * @param nodeId Node id. + */ + public abstract void failNode(UUID nodeId); + + /** + * @param gridName Grid name. + * @throws IgniteSpiException If failed. + */ + public abstract void spiStart(@Nullable String gridName) throws IgniteSpiException; + + /** + * @throws IgniteSpiException If failed. + */ + public abstract void spiStop() throws IgniteSpiException; + + /** + * @param spiCtx Spi context. + * @throws IgniteSpiException If failed. + */ + public abstract void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException; + + /** + * @param t Thread. + * @return Status as string. + */ + protected static String threadStatus(Thread t) { + if (t == null) + return "N/A"; + + return t.isAlive() ? "alive" : "dead"; + } + + /** + * <strong>FOR TEST ONLY!!!</strong> + * <p> + * Simulates this node failure by stopping service threads. So, node will become + * unresponsive. + * <p> + * This method is intended for test purposes only. + */ + abstract void simulateNodeFailure(); + + /** + * FOR TEST PURPOSE ONLY! + */ + public abstract void brakeConnection(); + + /** + * <strong>FOR TEST ONLY!!!</strong> + * + * @return Worker thread. + */ + protected abstract IgniteSpiThread workerThread(); + + /** + * @throws IgniteSpiException If failed. + */ + @SuppressWarnings("BusyWait") + protected final void registerLocalNodeAddress() throws IgniteSpiException { + // Make sure address registration succeeded. + while (true) { + try { + spi.ipFinder.initializeLocalAddresses(locNode.socketAddresses()); + + // Success. + break; + } + catch (IllegalStateException e) { + throw new IgniteSpiException("Failed to register local node address with IP finder: " + + locNode.socketAddresses(), e); + } + catch (IgniteSpiException e) { + LT.error(log, e, "Failed to register local node address in IP finder on start " + + "(retrying every 2000 ms)."); + } + + try { + U.sleep(2000); + } + catch (IgniteInterruptedCheckedException e) { + throw new IgniteSpiException("Thread has been interrupted.", e); + } + } + } +}