http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/838c0fd8/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java new file mode 100644 index 0000000..7e1f592 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -0,0 +1,4792 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.events.*; +import org.apache.ignite.internal.processors.security.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.future.*; +import org.apache.ignite.internal.util.io.*; +import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.plugin.security.*; +import org.apache.ignite.spi.*; +import org.apache.ignite.spi.discovery.*; +import org.apache.ignite.spi.discovery.tcp.internal.*; +import org.apache.ignite.spi.discovery.tcp.messages.*; +import org.jetbrains.annotations.*; +import org.jsr166.*; + +import java.io.*; +import java.net.*; +import java.text.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.events.EventType.*; +import static org.apache.ignite.internal.IgniteNodeAttributes.*; +import static org.apache.ignite.spi.IgnitePortProtocol.*; +import static org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoverySpiState.*; +import static org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusCheckMessage.*; + +/** + * + */ +class ServerImpl extends TcpDiscoveryImpl { + /** */ + private final Executor utilityPool = new ThreadPoolExecutor(0, 1, 2000, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<Runnable>()); + + /** Nodes ring. */ + @GridToStringExclude + private final TcpDiscoveryNodesRing ring = new TcpDiscoveryNodesRing(); + + /** Topology snapshots history. */ + private final SortedMap<Long, Collection<ClusterNode>> topHist = new TreeMap<>(); + + /** Socket readers. */ + private final Collection<SocketReader> readers = new LinkedList<>(); + + /** TCP server for discovery SPI. */ + private TcpServer tcpSrvr; + + /** Message worker. */ + @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") + private RingMessageWorker msgWorker; + + /** Client message workers. */ + private ConcurrentMap<UUID, ClientMessageWorker> clientMsgWorkers = new ConcurrentHashMap8<>(); + + /** Metrics sender. */ + @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") + private HeartbeatsSender hbsSnd; + + /** Status checker. */ + @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") + private CheckStatusSender chkStatusSnd; + + /** IP finder cleaner. */ + @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") + private IpFinderCleaner ipFinderCleaner; + + /** Statistics printer thread. */ + @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") + private StatisticsPrinter statsPrinter; + + /** Failed nodes (but still in topology). */ + private Collection<TcpDiscoveryNode> failedNodes = new HashSet<>(); + + /** Leaving nodes (but still in topology). */ + private Collection<TcpDiscoveryNode> leavingNodes = new HashSet<>(); + + /** If non-shared IP finder is used this flag shows whether IP finder contains local address. */ + private boolean ipFinderHasLocAddr; + + /** Addresses that do not respond during join requests send (for resolving concurrent start). */ + private final Collection<SocketAddress> noResAddrs = new GridConcurrentHashSet<>(); + + /** Addresses that incoming join requests send were send from (for resolving concurrent start). */ + private final Collection<SocketAddress> fromAddrs = new GridConcurrentHashSet<>(); + + /** Response on join request from coordinator (in case of duplicate ID or auth failure). */ + private final GridTuple<TcpDiscoveryAbstractMessage> joinRes = F.t1(); + + /** Node authenticator. */ + private DiscoverySpiNodeAuthenticator nodeAuth; + + /** Mutex. */ + private final Object mux = new Object(); + + /** Discovery state. */ + protected TcpDiscoverySpiState spiState = DISCONNECTED; + + /** Map with proceeding ping requests. */ + private final ConcurrentMap<InetSocketAddress, IgniteInternalFuture<IgniteBiTuple<UUID, Boolean>>> pingMap = + new ConcurrentHashMap8<>(); + + /** Debug mode. */ + private boolean debugMode; + + /** Debug messages history. */ + private int debugMsgHist = 512; + + /** Received messages. */ + @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") + private ConcurrentLinkedDeque<String> debugLog; + + /** + * @param adapter Adapter. + */ + ServerImpl(TcpDiscoverySpi adapter) { + super(adapter); + } + + /** + * This method is intended for troubleshooting purposes only. + * + * @param debugMode {code True} to start SPI in debug mode. + */ + public void setDebugMode(boolean debugMode) { + this.debugMode = debugMode; + } + + /** + * This method is intended for troubleshooting purposes only. + * + * @param debugMsgHist Message history log size. + */ + public void setDebugMessageHistory(int debugMsgHist) { + this.debugMsgHist = debugMsgHist; + } + + /** {@inheritDoc} */ + @Override public String getSpiState() { + synchronized (mux) { + return spiState.name(); + } + } + + /** {@inheritDoc} */ + @Override public int getMessageWorkerQueueSize() { + return msgWorker.queueSize(); + } + + /** {@inheritDoc} */ + @Nullable @Override public UUID getCoordinator() { + TcpDiscoveryNode crd = resolveCoordinator(); + + return crd != null ? crd.id() : null; + } + + /** {@inheritDoc} */ + @Nullable @Override public ClusterNode getNode(UUID nodeId) { + assert nodeId != null; + + UUID locNodeId0 = getLocalNodeId(); + + if (locNodeId0 != null && locNodeId0.equals(nodeId)) + // Return local node directly. + return locNode; + + TcpDiscoveryNode node = ring.node(nodeId); + + if (node != null && !node.visible()) + return null; + + return node; + } + + /** {@inheritDoc} */ + @Override public Collection<ClusterNode> getRemoteNodes() { + return F.upcast(ring.visibleRemoteNodes()); + } + + /** {@inheritDoc} */ + @Override public void spiStart(String gridName) throws IgniteSpiException { + synchronized (mux) { + spiState = DISCONNECTED; + } + + if (debugMode) { + if (!log.isInfoEnabled()) + throw new IgniteSpiException("Info log level should be enabled for TCP discovery to work " + + "in debug mode."); + + debugLog = new ConcurrentLinkedDeque<>(); + + U.quietAndWarn(log, "TCP discovery SPI is configured in debug mode."); + } + + // Clear addresses collections. + fromAddrs.clear(); + noResAddrs.clear(); + + msgWorker = new RingMessageWorker(); + msgWorker.start(); + + tcpSrvr = new TcpServer(); + + adapter.initLocalNode(tcpSrvr.port, true); + + locNode = adapter.locNode; + + // Start TCP server thread after local node is initialized. + tcpSrvr.start(); + + ring.localNode(locNode); + + if (adapter.ipFinder.isShared()) + registerLocalNodeAddress(); + else { + if (F.isEmpty(adapter.ipFinder.getRegisteredAddresses())) + throw new IgniteSpiException("Non-shared IP finder must have IP addresses specified in " + + "GridTcpDiscoveryIpFinder.getRegisteredAddresses() configuration property " + + "(specify list of IP addresses in configuration)."); + + ipFinderHasLocAddr = adapter.ipFinderHasLocalAddress(); + } + + if (adapter.getStatisticsPrintFrequency() > 0 && log.isInfoEnabled()) { + statsPrinter = new StatisticsPrinter(); + statsPrinter.start(); + } + + adapter.stats.onJoinStarted(); + + joinTopology(); + + adapter.stats.onJoinFinished(); + + hbsSnd = new HeartbeatsSender(); + hbsSnd.start(); + + chkStatusSnd = new CheckStatusSender(); + chkStatusSnd.start(); + + if (adapter.ipFinder.isShared()) { + ipFinderCleaner = new IpFinderCleaner(); + ipFinderCleaner.start(); + } + + adapter.printStartInfo(); + } + + /** + * @throws IgniteSpiException If failed. + */ + @SuppressWarnings("BusyWait") + private void registerLocalNodeAddress() throws IgniteSpiException { + // Make sure address registration succeeded. + while (true) { + try { + adapter.ipFinder.initializeLocalAddresses(locNode.socketAddresses()); + + // Success. + break; + } + catch (IllegalStateException e) { + throw new IgniteSpiException("Failed to register local node address with IP finder: " + + locNode.socketAddresses(), e); + } + catch (IgniteSpiException e) { + LT.error(log, e, "Failed to register local node address in IP finder on start " + + "(retrying every 2000 ms)."); + } + + try { + U.sleep(2000); + } + catch (IgniteInterruptedCheckedException e) { + throw new IgniteSpiException("Thread has been interrupted.", e); + } + } + } + + /** {@inheritDoc} */ + @Override public void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException { + spiCtx.registerPort(tcpSrvr.port, TCP); + } + + /** {@inheritDoc} */ + @Override public void spiStop() throws IgniteSpiException { + spiStop0(false); + } + + /** + * Stops SPI finally or stops SPI for restart. + * + * @param disconnect {@code True} if SPI is being disconnected. + * @throws IgniteSpiException If failed. + */ + private void spiStop0(boolean disconnect) throws IgniteSpiException { + if (log.isDebugEnabled()) { + if (disconnect) + log.debug("Disconnecting SPI."); + else + log.debug("Preparing to start local node stop procedure."); + } + + if (disconnect) { + synchronized (mux) { + spiState = DISCONNECTING; + } + } + + if (msgWorker != null && msgWorker.isAlive() && !disconnect) { + // Send node left message only if it is final stop. + msgWorker.addMessage(new TcpDiscoveryNodeLeftMessage(locNode.id())); + + synchronized (mux) { + long threshold = U.currentTimeMillis() + adapter.netTimeout; + + long timeout = adapter.netTimeout; + + while (spiState != LEFT && timeout > 0) { + try { + mux.wait(timeout); + + timeout = threshold - U.currentTimeMillis(); + } + catch (InterruptedException ignored) { + Thread.currentThread().interrupt(); + + break; + } + } + + if (spiState == LEFT) { + if (log.isDebugEnabled()) + log.debug("Verification for local node leave has been received from coordinator" + + " (continuing stop procedure)."); + } + else if (log.isInfoEnabled()) { + log.info("No verification for local node leave has been received from coordinator" + + " (will stop node anyway)."); + } + } + } + + U.interrupt(tcpSrvr); + U.join(tcpSrvr, log); + + Collection<SocketReader> tmp; + + synchronized (mux) { + tmp = U.arrayList(readers); + } + + U.interrupt(tmp); + U.joinThreads(tmp, log); + + U.interrupt(hbsSnd); + U.join(hbsSnd, log); + + U.interrupt(chkStatusSnd); + U.join(chkStatusSnd, log); + + U.interrupt(ipFinderCleaner); + U.join(ipFinderCleaner, log); + + U.interrupt(msgWorker); + U.join(msgWorker, log); + + U.interrupt(statsPrinter); + U.join(statsPrinter, log); + + Collection<TcpDiscoveryNode> rmts = null; + + if (!disconnect) + adapter.printStopInfo(); + else { + adapter.getSpiContext().deregisterPorts(); + + rmts = ring.visibleRemoteNodes(); + } + + long topVer = ring.topologyVersion(); + + ring.clear(); + + if (rmts != null && !rmts.isEmpty()) { + // This is restart/disconnection and remote nodes are not empty. + // We need to fire FAIL event for each. + DiscoverySpiListener lsnr = adapter.lsnr; + + if (lsnr != null) { + Set<ClusterNode> processed = new HashSet<>(); + + for (TcpDiscoveryNode n : rmts) { + assert n.visible(); + + processed.add(n); + + List<ClusterNode> top = U.arrayList(rmts, F.notIn(processed)); + + topVer++; + + Map<Long, Collection<ClusterNode>> hist = updateTopologyHistory(topVer, + Collections.unmodifiableList(top)); + + lsnr.onDiscovery(EVT_NODE_FAILED, topVer, n, top, hist, null); + } + } + } + + printStatistics(); + + adapter.stats.clear(); + + synchronized (mux) { + // Clear stored data. + leavingNodes.clear(); + failedNodes.clear(); + + spiState = DISCONNECTED; + } + } + + /** {@inheritDoc} */ + @Override public boolean pingNode(UUID nodeId) { + assert nodeId != null; + + if (log.isDebugEnabled()) + log.debug("Pinging node: " + nodeId + "]."); + + if (nodeId == getLocalNodeId()) + return true; + + TcpDiscoveryNode node = ring.node(nodeId); + + if (node == null || !node.visible()) + return false; + + boolean res = pingNode(node); + + if (!res && !node.isClient()) { + LT.warn(log, null, "Failed to ping node (status check will be initiated): " + nodeId); + + msgWorker.addMessage(new TcpDiscoveryStatusCheckMessage(locNode, node.id())); + } + + return res; + } + + /** + * Pings the remote node to see if it's alive. + * + * @param node Node. + * @return {@code True} if ping succeeds. + */ + private boolean pingNode(TcpDiscoveryNode node) { + assert node != null; + + if (node.id().equals(getLocalNodeId())) + return true; + + UUID clientNodeId = null; + + if (node.isClient()) { + clientNodeId = node.id(); + + node = ring.node(node.clientRouterNodeId()); + + if (node == null || !node.visible()) + return false; + } + + for (InetSocketAddress addr : adapter.getNodeAddresses(node, U.sameMacs(locNode, node))) { + try { + // ID returned by the node should be the same as ID of the parameter for ping to succeed. + IgniteBiTuple<UUID, Boolean> t = pingNode(addr, clientNodeId); + + return node.id().equals(t.get1()) && (clientNodeId == null || t.get2()); + } + catch (IgniteCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to ping node [node=" + node + ", err=" + e.getMessage() + ']'); + + onException("Failed to ping node [node=" + node + ", err=" + e.getMessage() + ']', e); + // continue; + } + } + + return false; + } + + /** + * Pings the node by its address to see if it's alive. + * + * @param addr Address of the node. + * @return ID of the remote node and "client exists" flag if node alive. + * @throws IgniteSpiException If an error occurs. + */ + private IgniteBiTuple<UUID, Boolean> pingNode(InetSocketAddress addr, @Nullable UUID clientNodeId) + throws IgniteCheckedException { + assert addr != null; + + UUID locNodeId = getLocalNodeId(); + + if (F.contains(adapter.locNodeAddrs, addr)) { + if (clientNodeId == null) + return F.t(getLocalNodeId(), false); + + ClientMessageWorker clientWorker = clientMsgWorkers.get(clientNodeId); + + if (clientWorker == null) + return F.t(getLocalNodeId(), false); + + boolean clientPingRes; + + try { + clientPingRes = clientWorker.ping(); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + + throw new IgniteInterruptedCheckedException(e); + } + + return F.t(getLocalNodeId(), clientPingRes); + } + + GridFutureAdapter<IgniteBiTuple<UUID, Boolean>> fut = new GridFutureAdapter<>(); + + IgniteInternalFuture<IgniteBiTuple<UUID, Boolean>> oldFut = pingMap.putIfAbsent(addr, fut); + + if (oldFut != null) + return oldFut.get(); + else { + Collection<Throwable> errs = null; + + try { + Socket sock = null; + + for (int i = 0; i < adapter.reconCnt; i++) { + try { + if (addr.isUnresolved()) + addr = new InetSocketAddress(InetAddress.getByName(addr.getHostName()), addr.getPort()); + + long tstamp = U.currentTimeMillis(); + + sock = adapter.openSocket(addr); + + adapter.writeToSocket(sock, new TcpDiscoveryPingRequest(locNodeId, clientNodeId)); + + TcpDiscoveryPingResponse res = adapter.readMessage(sock, null, adapter.netTimeout); + + if (locNodeId.equals(res.creatorNodeId())) { + if (log.isDebugEnabled()) + log.debug("Ping response from local node: " + res); + + break; + } + + adapter.stats.onClientSocketInitialized(U.currentTimeMillis() - tstamp); + + IgniteBiTuple<UUID, Boolean> t = F.t(res.creatorNodeId(), res.clientExists()); + + fut.onDone(t); + + return t; + } + catch (IOException | IgniteCheckedException e) { + if (errs == null) + errs = new ArrayList<>(); + + errs.add(e); + } + finally { + U.closeQuiet(sock); + } + } + } + catch (Throwable t) { + fut.onDone(t); + + if (t instanceof Error) + throw t; + + throw U.cast(t); + } + finally { + if (!fut.isDone()) + fut.onDone(U.exceptionWithSuppressed("Failed to ping node by address: " + addr, errs)); + + boolean b = pingMap.remove(addr, fut); + + assert b; + } + + return fut.get(); + } + } + + /** {@inheritDoc} */ + @Override public void disconnect() throws IgniteSpiException { + spiStop0(true); + } + + /** {@inheritDoc} */ + @Override public void setAuthenticator(DiscoverySpiNodeAuthenticator nodeAuth) { + this.nodeAuth = nodeAuth; + } + + /** {@inheritDoc} */ + @Override public void sendCustomEvent(DiscoverySpiCustomMessage evt) { + try { + msgWorker.addMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt, adapter.marsh.marshal(evt))); + } + catch (IgniteCheckedException e) { + throw new IgniteSpiException("Failed to marshal custom event: " + evt, e); + } + } + + /** {@inheritDoc} */ + @Override public void failNode(UUID nodeId) { + ClusterNode node = ring.node(nodeId); + + if (node != null) { + TcpDiscoveryNodeFailedMessage msg = new TcpDiscoveryNodeFailedMessage(getLocalNodeId(), + node.id(), node.order()); + + msgWorker.addMessage(msg); + } + } + + /** + * Tries to join this node to topology. + * + * @throws IgniteSpiException If any error occurs. + */ + private void joinTopology() throws IgniteSpiException { + synchronized (mux) { + assert spiState == CONNECTING || spiState == DISCONNECTED; + + spiState = CONNECTING; + } + + SecurityCredentials locCred = (SecurityCredentials)locNode.getAttributes() + .get(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS); + + // Marshal credentials for backward compatibility and security. + marshalCredentials(locNode); + + while (true) { + if (!sendJoinRequestMessage()) { + if (log.isDebugEnabled()) + log.debug("Join request message has not been sent (local node is the first in the topology)."); + + if (nodeAuth != null) { + // Authenticate local node. + try { + SecurityContext subj = nodeAuth.authenticateNode(locNode, locCred); + + if (subj == null) + throw new IgniteSpiException("Authentication failed for local node: " + locNode.id()); + + Map<String, Object> attrs = new HashMap<>(locNode.attributes()); + + attrs.put(IgniteNodeAttributes.ATTR_SECURITY_SUBJECT, + adapter.ignite().configuration().getMarshaller().marshal(subj)); + attrs.remove(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS); + + locNode.setAttributes(attrs); + } + catch (IgniteException | IgniteCheckedException e) { + throw new IgniteSpiException("Failed to authenticate local node (will shutdown local node).", e); + } + } + + locNode.order(1); + locNode.internalOrder(1); + + adapter.gridStartTime = U.currentTimeMillis(); + + locNode.visible(true); + + ring.clear(); + + ring.topologyVersion(1); + + synchronized (mux) { + topHist.clear(); + + spiState = CONNECTED; + + mux.notifyAll(); + } + + notifyDiscovery(EVT_NODE_JOINED, 1, locNode); + + break; + } + + if (log.isDebugEnabled()) + log.debug("Join request message has been sent (waiting for coordinator response)."); + + synchronized (mux) { + long threshold = U.currentTimeMillis() + adapter.netTimeout; + + long timeout = adapter.netTimeout; + + while (spiState == CONNECTING && timeout > 0) { + try { + mux.wait(timeout); + + timeout = threshold - U.currentTimeMillis(); + } + catch (InterruptedException ignored) { + Thread.currentThread().interrupt(); + + throw new IgniteSpiException("Thread has been interrupted."); + } + } + + if (spiState == CONNECTED) + break; + else if (spiState == DUPLICATE_ID) + throw adapter.duplicateIdError((TcpDiscoveryDuplicateIdMessage)joinRes.get()); + else if (spiState == AUTH_FAILED) + throw adapter.authenticationFailedError((TcpDiscoveryAuthFailedMessage)joinRes.get()); + else if (spiState == CHECK_FAILED) + throw adapter.checkFailedError((TcpDiscoveryCheckFailedMessage)joinRes.get()); + else if (spiState == LOOPBACK_PROBLEM) { + TcpDiscoveryLoopbackProblemMessage msg = (TcpDiscoveryLoopbackProblemMessage)joinRes.get(); + + boolean locHostLoopback = adapter.locHost.isLoopbackAddress(); + + String firstNode = locHostLoopback ? "local" : "remote"; + + String secondNode = locHostLoopback ? "remote" : "local"; + + throw new IgniteSpiException("Failed to add node to topology because " + firstNode + + " node is configured to use loopback address, but " + secondNode + " node is not " + + "(consider changing 'localAddress' configuration parameter) " + + "[locNodeAddrs=" + U.addressesAsString(locNode) + ", rmtNodeAddrs=" + + U.addressesAsString(msg.addresses(), msg.hostNames()) + ']'); + } + else + LT.warn(log, null, "Node has not been connected to topology and will repeat join process. " + + "Check remote nodes logs for possible error messages. " + + "Note that large topology may require significant time to start. " + + "Increase 'TcpDiscoverySpi.networkTimeout' configuration property " + + "if getting this message on the starting nodes [networkTimeout=" + adapter.netTimeout + ']'); + } + } + + assert locNode.order() != 0; + assert locNode.internalOrder() != 0; + + if (log.isDebugEnabled()) + log.debug("Discovery SPI has been connected to topology with order: " + locNode.internalOrder()); + } + + /** + * Tries to send join request message to a random node presenting in topology. + * Address is provided by {@link org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder} and message is + * sent to first node connection succeeded to. + * + * @return {@code true} if send succeeded. + * @throws IgniteSpiException If any error occurs. + */ + @SuppressWarnings({"BusyWait"}) + private boolean sendJoinRequestMessage() throws IgniteSpiException { + TcpDiscoveryAbstractMessage joinReq = new TcpDiscoveryJoinRequestMessage(locNode, + adapter.collectExchangeData(getLocalNodeId())); + + // Time when it has been detected, that addresses from IP finder do not respond. + long noResStart = 0; + + while (true) { + Collection<InetSocketAddress> addrs = adapter.resolvedAddresses(); + + if (F.isEmpty(addrs)) + return false; + + boolean retry = false; + Collection<Exception> errs = new ArrayList<>(); + + try (SocketMultiConnector multiConnector = new SocketMultiConnector(adapter, addrs, 2)) { + GridTuple3<InetSocketAddress, Socket, Exception> tuple; + + while ((tuple = multiConnector.next()) != null) { + InetSocketAddress addr = tuple.get1(); + Socket sock = tuple.get2(); + Exception ex = tuple.get3(); + + if (ex == null) { + assert sock != null; + + try { + Integer res = sendMessageDirectly(joinReq, addr, sock); + + assert res != null; + + noResAddrs.remove(addr); + + // Address is responsive, reset period start. + noResStart = 0; + + switch (res) { + case RES_WAIT: + // Concurrent startup, try sending join request again or wait if no success. + retry = true; + + break; + case RES_OK: + if (log.isDebugEnabled()) + log.debug("Join request message has been sent to address [addr=" + addr + + ", req=" + joinReq + ']'); + + // Join request sending succeeded, wait for response from topology. + return true; + + default: + // Concurrent startup, try next node. + if (res == RES_CONTINUE_JOIN) { + if (!fromAddrs.contains(addr)) + retry = true; + } + else { + if (log.isDebugEnabled()) + log.debug("Unexpected response to join request: " + res); + + retry = true; + } + + break; + } + } + catch (IgniteSpiException e) { + e.printStackTrace(); + + ex = e; + } + } + + if (ex != null) { + errs.add(ex); + + if (log.isDebugEnabled()) { + IOException ioe = X.cause(ex, IOException.class); + + log.debug("Failed to send join request message [addr=" + addr + + ", msg=" + ioe != null ? ioe.getMessage() : ex.getMessage() + ']'); + + onException("Failed to send join request message [addr=" + addr + + ", msg=" + ioe != null ? ioe.getMessage() : ex.getMessage() + ']', ioe); + } + + noResAddrs.add(addr); + } + } + } + + if (retry) { + if (log.isDebugEnabled()) + log.debug("Concurrent discovery SPI start has been detected (local node should wait)."); + + try { + U.sleep(2000); + } + catch (IgniteInterruptedCheckedException e) { + throw new IgniteSpiException("Thread has been interrupted.", e); + } + } + else if (!adapter.ipFinder.isShared() && !ipFinderHasLocAddr) { + IgniteCheckedException e = null; + + if (!errs.isEmpty()) { + e = new IgniteCheckedException("Multiple connection attempts failed."); + + for (Exception err : errs) + e.addSuppressed(err); + } + + if (e != null && X.hasCause(e, ConnectException.class)) + LT.warn(log, null, "Failed to connect to any address from IP finder " + + "(make sure IP finder addresses are correct and firewalls are disabled on all host machines): " + + addrs); + + if (adapter.joinTimeout > 0) { + if (noResStart == 0) + noResStart = U.currentTimeMillis(); + else if (U.currentTimeMillis() - noResStart > adapter.joinTimeout) + throw new IgniteSpiException( + "Failed to connect to any address from IP finder within join timeout " + + "(make sure IP finder addresses are correct, and operating system firewalls are disabled " + + "on all host machines, or consider increasing 'joinTimeout' configuration property): " + + addrs, e); + } + + try { + U.sleep(2000); + } + catch (IgniteInterruptedCheckedException ex) { + throw new IgniteSpiException("Thread has been interrupted.", ex); + } + } + else + break; + } + + return false; + } + + /** + * Establishes connection to an address, sends message and returns the response (if any). + * + * @param msg Message to send. + * @param addr Address to send message to. + * @return Response read from the recipient or {@code null} if no response is supposed. + * @throws IgniteSpiException If an error occurs. + */ + @Nullable private Integer sendMessageDirectly(TcpDiscoveryAbstractMessage msg, InetSocketAddress addr, Socket sock) + throws IgniteSpiException { + assert msg != null; + assert addr != null; + + Collection<Throwable> errs = null; + + long ackTimeout0 = adapter.ackTimeout; + + int connectAttempts = 1; + + boolean joinReqSent = false; + + UUID locNodeId = getLocalNodeId(); + + for (int i = 0; i < adapter.reconCnt; i++) { + // Need to set to false on each new iteration, + // since remote node may leave in the middle of the first iteration. + joinReqSent = false; + + boolean openSock = false; + + try { + long tstamp = U.currentTimeMillis(); + + if (sock == null) + sock = adapter.openSocket(addr); + + openSock = true; + + // Handshake. + adapter.writeToSocket(sock, new TcpDiscoveryHandshakeRequest(locNodeId)); + + TcpDiscoveryHandshakeResponse res = adapter.readMessage(sock, null, ackTimeout0); + + if (locNodeId.equals(res.creatorNodeId())) { + if (log.isDebugEnabled()) + log.debug("Handshake response from local node: " + res); + + break; + } + + adapter.stats.onClientSocketInitialized(U.currentTimeMillis() - tstamp); + + // Send message. + tstamp = U.currentTimeMillis(); + + adapter.writeToSocket(sock, msg); + + adapter.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp); + + if (debugMode) + debugLog("Message has been sent directly to address [msg=" + msg + ", addr=" + addr + + ", rmtNodeId=" + res.creatorNodeId() + ']'); + + if (log.isDebugEnabled()) + log.debug("Message has been sent directly to address [msg=" + msg + ", addr=" + addr + + ", rmtNodeId=" + res.creatorNodeId() + ']'); + + // Connection has been established, but + // join request may not be unmarshalled on remote host. + // E.g. due to class not found issue. + joinReqSent = msg instanceof TcpDiscoveryJoinRequestMessage; + + return adapter.readReceipt(sock, ackTimeout0); + } + catch (ClassCastException e) { + // This issue is rarely reproducible on AmazonEC2, but never + // on dedicated machines. + if (log.isDebugEnabled()) + U.error(log, "Class cast exception on direct send: " + addr, e); + + onException("Class cast exception on direct send: " + addr, e); + + if (errs == null) + errs = new ArrayList<>(); + + errs.add(e); + } + catch (IOException | IgniteCheckedException e) { + if (log.isDebugEnabled()) + log.error("Exception on direct send: " + e.getMessage(), e); + + onException("Exception on direct send: " + e.getMessage(), e); + + if (errs == null) + errs = new ArrayList<>(); + + errs.add(e); + + if (!openSock) { + // Reconnect for the second time, if connection is not established. + if (connectAttempts < 2) { + connectAttempts++; + + continue; + } + + break; // Don't retry if we can not establish connection. + } + + if (e instanceof SocketTimeoutException || X.hasCause(e, SocketTimeoutException.class)) { + ackTimeout0 *= 2; + + if (!checkAckTimeout(ackTimeout0)) + break; + } + } + finally { + U.closeQuiet(sock); + + sock = null; + } + } + + if (joinReqSent) { + if (log.isDebugEnabled()) + log.debug("Join request has been sent, but receipt has not been read (returning RES_WAIT)."); + + // Topology will not include this node, + // however, warning on timed out join will be output. + return RES_OK; + } + + throw new IgniteSpiException( + "Failed to send message to address [addr=" + addr + ", msg=" + msg + ']', + U.exceptionWithSuppressed("Failed to send message to address " + + "[addr=" + addr + ", msg=" + msg + ']', errs)); + } + + /** + * Marshalls credentials with discovery SPI marshaller (will replace attribute value). + * + * @param node Node to marshall credentials for. + * @throws IgniteSpiException If marshalling failed. + */ + private void marshalCredentials(TcpDiscoveryNode node) throws IgniteSpiException { + try { + // Use security-unsafe getter. + Map<String, Object> attrs = new HashMap<>(node.getAttributes()); + + attrs.put(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS, + adapter.marsh.marshal(attrs.get(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS))); + + node.setAttributes(attrs); + } + catch (IgniteCheckedException e) { + throw new IgniteSpiException("Failed to marshal node security credentials: " + node.id(), e); + } + } + + /** + * Unmarshalls credentials with discovery SPI marshaller (will not replace attribute value). + * + * @param node Node to unmarshall credentials for. + * @return Security credentials. + * @throws IgniteSpiException If unmarshal fails. + */ + private SecurityCredentials unmarshalCredentials(TcpDiscoveryNode node) throws IgniteSpiException { + try { + byte[] credBytes = (byte[])node.getAttributes().get(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS); + + if (credBytes == null) + return null; + + return adapter.marsh.unmarshal(credBytes, null); + } + catch (IgniteCheckedException e) { + throw new IgniteSpiException("Failed to unmarshal node security credentials: " + node.id(), e); + } + } + + /** + * @param ackTimeout Acknowledgement timeout. + * @return {@code True} if acknowledgement timeout is less or equal to + * maximum acknowledgement timeout, {@code false} otherwise. + */ + private boolean checkAckTimeout(long ackTimeout) { + if (ackTimeout > adapter.maxAckTimeout) { + LT.warn(log, null, "Acknowledgement timeout is greater than maximum acknowledgement timeout " + + "(consider increasing 'maxAckTimeout' configuration property) " + + "[ackTimeout=" + ackTimeout + ", maxAckTimeout=" + adapter.maxAckTimeout + ']'); + + return false; + } + + return true; + } + + /** + * Notify external listener on discovery event. + * + * @param type Discovery event type. See {@link org.apache.ignite.events.DiscoveryEvent} for more details. + * @param topVer Topology version. + * @param node Remote node this event is connected with. + */ + private void notifyDiscovery(int type, long topVer, TcpDiscoveryNode node) { + assert type > 0; + assert node != null; + + DiscoverySpiListener lsnr = adapter.lsnr; + + TcpDiscoverySpiState spiState = spiStateCopy(); + + if (lsnr != null && node.visible() && (spiState == CONNECTED || spiState == DISCONNECTING)) { + if (log.isDebugEnabled()) + log.debug("Discovery notification [node=" + node + ", spiState=" + spiState + + ", type=" + U.gridEventName(type) + ", topVer=" + topVer + ']'); + + Collection<ClusterNode> top = F.upcast(ring.visibleNodes()); + + Map<Long, Collection<ClusterNode>> hist = updateTopologyHistory(topVer, top); + + lsnr.onDiscovery(type, topVer, node, top, hist, null); + } + else if (log.isDebugEnabled()) + log.debug("Skipped discovery notification [node=" + node + ", spiState=" + spiState + + ", type=" + U.gridEventName(type) + ", topVer=" + topVer + ']'); + } + + /** + * Update topology history with new topology snapshots. + * + * @param topVer Topology version. + * @param top Topology snapshot. + * @return Copy of updated topology history. + */ + @Nullable private Map<Long, Collection<ClusterNode>> updateTopologyHistory(long topVer, Collection<ClusterNode> top) { + synchronized (mux) { + if (topHist.containsKey(topVer)) + return null; + + topHist.put(topVer, top); + + while (topHist.size() > adapter.topHistSize) + topHist.remove(topHist.firstKey()); + + if (log.isDebugEnabled()) + log.debug("Added topology snapshot to history, topVer=" + topVer + ", historySize=" + topHist.size()); + + return new TreeMap<>(topHist); + } + } + + /** + * Checks whether local node is coordinator. Nodes that are leaving or failed + * (but are still in topology) are removed from search. + * + * @return {@code true} if local node is coordinator. + */ + private boolean isLocalNodeCoordinator() { + synchronized (mux) { + boolean crd = spiState == CONNECTED && locNode.equals(resolveCoordinator()); + + if (crd) + adapter.stats.onBecomingCoordinator(); + + return crd; + } + } + + /** + * @return Spi state copy. + */ + private TcpDiscoverySpiState spiStateCopy() { + TcpDiscoverySpiState state; + + synchronized (mux) { + state = spiState; + } + + return state; + } + + /** + * Resolves coordinator. Nodes that are leaving or failed (but are still in + * topology) are removed from search. + * + * @return Coordinator node or {@code null} if there are no coordinator + * (i.e. local node is the last one and is currently stopping). + */ + @Nullable private TcpDiscoveryNode resolveCoordinator() { + return resolveCoordinator(null); + } + + /** + * Resolves coordinator. Nodes that are leaving or failed (but are still in + * topology) are removed from search as well as provided filter. + * + * @param filter Nodes to exclude when resolving coordinator (optional). + * @return Coordinator node or {@code null} if there are no coordinator + * (i.e. local node is the last one and is currently stopping). + */ + @Nullable private TcpDiscoveryNode resolveCoordinator( + @Nullable Collection<TcpDiscoveryNode> filter) { + synchronized (mux) { + Collection<TcpDiscoveryNode> excluded = F.concat(false, failedNodes, leavingNodes); + + if (!F.isEmpty(filter)) + excluded = F.concat(false, excluded, filter); + + return ring.coordinator(excluded); + } + } + + /** + * Prints SPI statistics. + */ + private void printStatistics() { + if (log.isInfoEnabled() && adapter.statsPrintFreq > 0) { + int failedNodesSize; + int leavingNodesSize; + + synchronized (mux) { + failedNodesSize = failedNodes.size(); + leavingNodesSize = leavingNodes.size(); + } + + Runtime runtime = Runtime.getRuntime(); + + TcpDiscoveryNode coord = resolveCoordinator(); + + log.info("Discovery SPI statistics [statistics=" + adapter.stats + ", spiState=" + spiStateCopy() + + ", coord=" + coord + + ", topSize=" + ring.allNodes().size() + + ", leavingNodesSize=" + leavingNodesSize + ", failedNodesSize=" + failedNodesSize + + ", msgWorker.queue.size=" + (msgWorker != null ? msgWorker.queueSize() : "N/A") + + ", lastUpdate=" + (locNode != null ? U.format(locNode.lastUpdateTime()) : "N/A") + + ", heapFree=" + runtime.freeMemory() / (1024 * 1024) + + "M, heapTotal=" + runtime.maxMemory() / (1024 * 1024) + "M]"); + } + } + + /** + * @param msg Message to prepare. + * @param destNodeId Destination node ID. + * @param msgs Messages to include. + * @param discardMsgId Discarded message ID. + */ + private void prepareNodeAddedMessage(TcpDiscoveryAbstractMessage msg, UUID destNodeId, + @Nullable Collection<TcpDiscoveryAbstractMessage> msgs, @Nullable IgniteUuid discardMsgId) { + assert destNodeId != null; + + if (msg instanceof TcpDiscoveryNodeAddedMessage) { + TcpDiscoveryNodeAddedMessage nodeAddedMsg = (TcpDiscoveryNodeAddedMessage)msg; + + TcpDiscoveryNode node = nodeAddedMsg.node(); + + if (node.id().equals(destNodeId)) { + Collection<TcpDiscoveryNode> allNodes = ring.allNodes(); + Collection<TcpDiscoveryNode> topToSend = new ArrayList<>(allNodes.size()); + + for (TcpDiscoveryNode n0 : allNodes) { + assert n0.internalOrder() != 0 : n0; + + // Skip next node and nodes added after next + // in case this message is resent due to failures/leaves. + // There will be separate messages for nodes with greater + // internal order. + if (n0.internalOrder() < nodeAddedMsg.node().internalOrder()) + topToSend.add(n0); + } + + nodeAddedMsg.topology(topToSend); + nodeAddedMsg.messages(msgs, discardMsgId); + + Map<Long, Collection<ClusterNode>> hist; + + synchronized (mux) { + hist = new TreeMap<>(topHist); + } + + nodeAddedMsg.topologyHistory(hist); + } + } + } + + /** + * @param msg Message to clear. + */ + private void clearNodeAddedMessage(TcpDiscoveryAbstractMessage msg) { + if (msg instanceof TcpDiscoveryNodeAddedMessage) { + // Nullify topology before registration. + TcpDiscoveryNodeAddedMessage nodeAddedMsg = (TcpDiscoveryNodeAddedMessage)msg; + + nodeAddedMsg.topology(null); + nodeAddedMsg.topologyHistory(null); + nodeAddedMsg.messages(null, null); + } + } + + /** {@inheritDoc} */ + @Override void simulateNodeFailure() { + U.warn(log, "Simulating node failure: " + getLocalNodeId()); + + U.interrupt(tcpSrvr); + U.join(tcpSrvr, log); + + U.interrupt(hbsSnd); + U.join(hbsSnd, log); + + U.interrupt(chkStatusSnd); + U.join(chkStatusSnd, log); + + U.interrupt(ipFinderCleaner); + U.join(ipFinderCleaner, log); + + Collection<SocketReader> tmp; + + synchronized (mux) { + tmp = U.arrayList(readers); + } + + U.interrupt(tmp); + U.joinThreads(tmp, log); + + U.interrupt(msgWorker); + U.join(msgWorker, log); + + U.interrupt(statsPrinter); + U.join(statsPrinter, log); + } + + /** {@inheritDoc} */ + @Override public void brakeConnection() { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override protected IgniteSpiThread workerThread() { + return msgWorker; + } + + /** + * <strong>FOR TEST ONLY!!!</strong> + * <p> + * Simulates situation when next node is still alive but is bypassed + * since it has been excluded from the ring, possibly, due to short time + * network problems. + * <p> + * This method is intended for test purposes only. + */ + void forceNextNodeFailure() { + U.warn(log, "Next node will be forcibly failed (if any)."); + + TcpDiscoveryNode next; + + synchronized (mux) { + next = ring.nextNode(failedNodes); + } + + if (next != null) + msgWorker.addMessage(new TcpDiscoveryNodeFailedMessage(getLocalNodeId(), next.id(), + next.internalOrder())); + } + + /** + * <strong>FOR TEST ONLY!!!</strong> + * <p> + * This method is intended for test purposes only. + * + * @return Nodes ring. + */ + TcpDiscoveryNodesRing ring() { + return ring; + } + + /** {@inheritDoc} */ + public void dumpDebugInfo(IgniteLogger log) { + if (!debugMode) { + U.quietAndWarn(log, "Failed to dump debug info (discovery SPI was not configured " + + "in debug mode, consider setting 'debugMode' configuration property to 'true')."); + + return; + } + + assert log.isInfoEnabled(); + + synchronized (mux) { + StringBuilder b = new StringBuilder(U.nl()); + + b.append(">>>").append(U.nl()); + b.append(">>>").append("Dumping discovery SPI debug info.").append(U.nl()); + b.append(">>>").append(U.nl()); + + b.append("Local node ID: ").append(getLocalNodeId()).append(U.nl()).append(U.nl()); + b.append("Local node: ").append(locNode).append(U.nl()).append(U.nl()); + b.append("SPI state: ").append(spiState).append(U.nl()).append(U.nl()); + + b.append("Internal threads: ").append(U.nl()); + + b.append(" Message worker: ").append(threadStatus(msgWorker)).append(U.nl()); + b.append(" Check status sender: ").append(threadStatus(chkStatusSnd)).append(U.nl()); + b.append(" HB sender: ").append(threadStatus(hbsSnd)).append(U.nl()); + b.append(" Socket timeout worker: ").append(threadStatus(adapter.sockTimeoutWorker)).append(U.nl()); + b.append(" IP finder cleaner: ").append(threadStatus(ipFinderCleaner)).append(U.nl()); + b.append(" Stats printer: ").append(threadStatus(statsPrinter)).append(U.nl()); + + b.append(U.nl()); + + b.append("Socket readers: ").append(U.nl()); + + for (SocketReader rdr : readers) + b.append(" ").append(rdr).append(U.nl()); + + b.append(U.nl()); + + b.append("In-memory log messages: ").append(U.nl()); + + for (String msg : debugLog) + b.append(" ").append(msg).append(U.nl()); + + b.append(U.nl()); + + b.append("Leaving nodes: ").append(U.nl()); + + for (TcpDiscoveryNode node : leavingNodes) + b.append(" ").append(node.id()).append(U.nl()); + + b.append(U.nl()); + + b.append("Failed nodes: ").append(U.nl()); + + for (TcpDiscoveryNode node : failedNodes) + b.append(" ").append(node.id()).append(U.nl()); + + b.append(U.nl()); + + b.append("Stats: ").append(adapter.stats).append(U.nl()); + + U.quietAndInfo(log, b.toString()); + } + } + + /** + * @param msg Message. + */ + private void debugLog(String msg) { + assert debugMode; + + String msg0 = new SimpleDateFormat("[HH:mm:ss,SSS]").format(new Date(System.currentTimeMillis())) + + '[' + Thread.currentThread().getName() + "][" + getLocalNodeId() + + "-" + locNode.internalOrder() + "] " + + msg; + + debugLog.add(msg0); + + int delta = debugLog.size() - debugMsgHist; + + for (int i = 0; i < delta && debugLog.size() > debugMsgHist; i++) + debugLog.poll(); + } + + /** + * @param msg Message. + * @return {@code True} if recordable in debug mode. + */ + private boolean recordable(TcpDiscoveryAbstractMessage msg) { + return !(msg instanceof TcpDiscoveryHeartbeatMessage) && + !(msg instanceof TcpDiscoveryStatusCheckMessage) && + !(msg instanceof TcpDiscoveryDiscardMessage); + } + + /** + * Checks if two given {@link SecurityPermissionSet} objects contain the same permissions. + * Each permission belongs to one of three groups : cache, task or system. + * + * @param locPerms The first set of permissions. + * @param rmtPerms The second set of permissions. + * @return {@code True} if given parameters contain the same permissions, {@code False} otherwise. + */ + private boolean permissionsEqual(SecurityPermissionSet locPerms, SecurityPermissionSet rmtPerms) { + boolean dfltAllowMatch = !(locPerms.defaultAllowAll() ^ rmtPerms.defaultAllowAll()); + + boolean bothHaveSamePerms = F.eqNotOrdered(rmtPerms.systemPermissions(), locPerms.systemPermissions()) && + F.eqNotOrdered(rmtPerms.cachePermissions(), locPerms.cachePermissions()) && + F.eqNotOrdered(rmtPerms.taskPermissions(), locPerms.taskPermissions()); + + return dfltAllowMatch && bothHaveSamePerms; + } + + /** + * @param msg Message. + * @param nodeId Node ID. + */ + private static void removeMetrics(TcpDiscoveryHeartbeatMessage msg, UUID nodeId) { + msg.removeMetrics(nodeId); + msg.removeCacheMetrics(nodeId); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ServerImpl.class, this); + } + + /** + * Thread that sends heartbeats. + */ + private class HeartbeatsSender extends IgniteSpiThread { + /** + * Constructor. + */ + private HeartbeatsSender() { + super(adapter.ignite().name(), "tcp-disco-hb-sender", log); + + setPriority(adapter.threadPri); + } + + /** {@inheritDoc} */ + @SuppressWarnings("BusyWait") + @Override protected void body() throws InterruptedException { + while (!isLocalNodeCoordinator()) + Thread.sleep(1000); + + if (log.isDebugEnabled()) + log.debug("Heartbeats sender has been started."); + + while (!isInterrupted()) { + if (spiStateCopy() != CONNECTED) { + if (log.isDebugEnabled()) + log.debug("Stopping heartbeats sender (SPI is not connected to topology)."); + + return; + } + + TcpDiscoveryHeartbeatMessage msg = new TcpDiscoveryHeartbeatMessage(getLocalNodeId()); + + msg.verify(getLocalNodeId()); + + msgWorker.addMessage(msg); + + Thread.sleep(adapter.hbFreq); + } + } + } + + /** + * Thread that sends status check messages to next node if local node has not + * been receiving heartbeats ({@link TcpDiscoveryHeartbeatMessage}) + * for {@link TcpDiscoverySpi#getMaxMissedHeartbeats()} * + * {@link TcpDiscoverySpi#getHeartbeatFrequency()}. + */ + private class CheckStatusSender extends IgniteSpiThread { + /** + * Constructor. + */ + private CheckStatusSender() { + super(adapter.ignite().name(), "tcp-disco-status-check-sender", log); + + setPriority(adapter.threadPri); + } + + /** {@inheritDoc} */ + @SuppressWarnings("BusyWait") + @Override protected void body() throws InterruptedException { + if (log.isDebugEnabled()) + log.debug("Status check sender has been started."); + + // Only 1 heartbeat missing is acceptable. Add 50 ms to avoid false alarm. + long checkTimeout = (long)adapter.maxMissedHbs * adapter.hbFreq + 50; + + long lastSent = 0; + + while (!isInterrupted()) { + // 1. Determine timeout. + if (lastSent < locNode.lastUpdateTime()) + lastSent = locNode.lastUpdateTime(); + + long timeout = (lastSent + checkTimeout) - U.currentTimeMillis(); + + if (timeout > 0) + Thread.sleep(timeout); + + // 2. Check if SPI is still connected. + if (spiStateCopy() != CONNECTED) { + if (log.isDebugEnabled()) + log.debug("Stopping status check sender (SPI is not connected to topology)."); + + return; + } + + // 3. Was there an update? + if (locNode.lastUpdateTime() > lastSent || !ring.hasRemoteNodes()) { + if (log.isDebugEnabled()) + log.debug("Skipping status check send " + + "[locNodeLastUpdate=" + U.format(locNode.lastUpdateTime()) + + ", hasRmts=" + ring.hasRemoteNodes() + ']'); + + continue; + } + + // 4. Send status check message. + lastSent = U.currentTimeMillis(); + + msgWorker.addMessage(new TcpDiscoveryStatusCheckMessage(locNode, null)); + } + } + } + + /** + * Thread that cleans IP finder and keeps it in the correct state, unregistering + * addresses of the nodes that has left the topology. + * <p> + * This thread should run only on coordinator node and will clean IP finder + * if and only if {@link org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder#isShared()} is {@code true}. + */ + private class IpFinderCleaner extends IgniteSpiThread { + /** + * Constructor. + */ + private IpFinderCleaner() { + super(adapter.ignite().name(), "tcp-disco-ip-finder-cleaner", log); + + setPriority(adapter.threadPri); + } + + /** {@inheritDoc} */ + @SuppressWarnings("BusyWait") + @Override protected void body() throws InterruptedException { + if (log.isDebugEnabled()) + log.debug("IP finder cleaner has been started."); + + while (!isInterrupted()) { + Thread.sleep(adapter.ipFinderCleanFreq); + + if (!isLocalNodeCoordinator()) + continue; + + if (spiStateCopy() != CONNECTED) { + if (log.isDebugEnabled()) + log.debug("Stopping IP finder cleaner (SPI is not connected to topology)."); + + return; + } + + if (adapter.ipFinder.isShared()) + cleanIpFinder(); + } + } + + /** + * Cleans IP finder. + */ + private void cleanIpFinder() { + assert adapter.ipFinder.isShared(); + + try { + // Addresses that belongs to nodes in topology. + Collection<InetSocketAddress> currAddrs = F.flatCollections( + F.viewReadOnly( + ring.allNodes(), + new C1<TcpDiscoveryNode, Collection<InetSocketAddress>>() { + @Override public Collection<InetSocketAddress> apply(TcpDiscoveryNode node) { + return !node.isClient() ? adapter.getNodeAddresses(node) : + Collections.<InetSocketAddress>emptyList(); + } + } + ) + ); + + // Addresses registered in IP finder. + Collection<InetSocketAddress> regAddrs = adapter.registeredAddresses(); + + // Remove all addresses that belong to alive nodes, leave dead-node addresses. + Collection<InetSocketAddress> rmvAddrs = F.view( + regAddrs, + F.notContains(currAddrs), + new P1<InetSocketAddress>() { + private final Map<InetSocketAddress, Boolean> pingResMap = + new HashMap<>(); + + @Override public boolean apply(InetSocketAddress addr) { + Boolean res = pingResMap.get(addr); + + if (res == null) { + try { + res = pingNode(addr, null).get1() != null; + } + catch (IgniteCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to ping node [addr=" + addr + + ", err=" + e.getMessage() + ']'); + + res = false; + } + finally { + pingResMap.put(addr, res); + } + } + + return !res; + } + } + ); + + // Unregister dead-nodes addresses. + if (!rmvAddrs.isEmpty()) { + adapter.ipFinder.unregisterAddresses(rmvAddrs); + + if (log.isDebugEnabled()) + log.debug("Unregistered addresses from IP finder: " + rmvAddrs); + } + + // Addresses that were removed by mistake (e.g. on segmentation). + Collection<InetSocketAddress> missingAddrs = F.view( + currAddrs, + F.notContains(regAddrs) + ); + + // Re-register missing addresses. + if (!missingAddrs.isEmpty()) { + adapter.ipFinder.registerAddresses(missingAddrs); + + if (log.isDebugEnabled()) + log.debug("Registered missing addresses in IP finder: " + missingAddrs); + } + } + catch (IgniteSpiException e) { + LT.error(log, e, "Failed to clean IP finder up."); + } + } + } + + /** + * Pending messages container. + */ + private static class PendingMessages { + /** */ + private static final int MAX = 1024; + + /** Pending messages. */ + private final Queue<TcpDiscoveryAbstractMessage> msgs = new ArrayDeque<>(MAX * 2); + + /** Discarded message ID. */ + private IgniteUuid discardId; + + /** + * Adds pending message and shrinks queue if it exceeds limit + * (messages that were not discarded yet are never removed). + * + * @param msg Message to add. + */ + void add(TcpDiscoveryAbstractMessage msg) { + msgs.add(msg); + + while (msgs.size() > MAX) { + TcpDiscoveryAbstractMessage polled = msgs.poll(); + + assert polled != null; + + if (polled.id().equals(discardId)) + break; + } + } + + /** + * Gets messages starting from provided ID (exclusive). If such + * message is not found, {@code null} is returned (this indicates + * a failure condition when it was already removed from queue). + * + * @param lastMsgId Last message ID. + * @return Collection of messages. + */ + @Nullable Collection<TcpDiscoveryAbstractMessage> messages(IgniteUuid lastMsgId) { + assert lastMsgId != null; + + Collection<TcpDiscoveryAbstractMessage> copy = new ArrayList<>(msgs.size()); + + boolean skip = true; + + for (TcpDiscoveryAbstractMessage msg : msgs) { + if (skip) { + if (msg.id().equals(lastMsgId)) + skip = false; + } + else + copy.add(msg); + } + + return !skip ? copy : null; + } + + /** + * Resets pending messages. + * + * @param msgs Message. + * @param discardId Discarded message ID. + */ + void reset(@Nullable Collection<TcpDiscoveryAbstractMessage> msgs, @Nullable IgniteUuid discardId) { + this.msgs.clear(); + + if (msgs != null) + this.msgs.addAll(msgs); + + this.discardId = discardId; + } + + /** + * Clears pending messages. + */ + void clear() { + msgs.clear(); + + discardId = null; + } + + /** + * Discards message with provided ID and all before it. + * + * @param id Discarded message ID. + */ + void discard(IgniteUuid id) { + discardId = id; + } + } + + /** + * Message worker thread for messages processing. + */ + private class RingMessageWorker extends MessageWorkerAdapter { + /** Next node. */ + @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"}) + private TcpDiscoveryNode next; + + /** Pending messages. */ + private final PendingMessages pendingMsgs = new PendingMessages(); + + /** Last message that updated topology. */ + private TcpDiscoveryAbstractMessage lastMsg; + + /** Force pending messages send. */ + private boolean forceSndPending; + + /** Socket. */ + private Socket sock; + + /** + */ + protected RingMessageWorker() { + super("tcp-disco-msg-worker"); + } + + /** + * @param msg Message to process. + */ + @Override protected void processMessage(TcpDiscoveryAbstractMessage msg) { + if (log.isDebugEnabled()) + log.debug("Processing message [cls=" + msg.getClass().getSimpleName() + ", id=" + msg.id() + ']'); + + if (debugMode) + debugLog("Processing message [cls=" + msg.getClass().getSimpleName() + ", id=" + msg.id() + ']'); + + adapter.stats.onMessageProcessingStarted(msg); + + if (msg instanceof TcpDiscoveryJoinRequestMessage) + processJoinRequestMessage((TcpDiscoveryJoinRequestMessage)msg); + + else if (msg instanceof TcpDiscoveryClientReconnectMessage) + processClientReconnectMessage((TcpDiscoveryClientReconnectMessage)msg); + + else if (msg instanceof TcpDiscoveryNodeAddedMessage) + processNodeAddedMessage((TcpDiscoveryNodeAddedMessage)msg); + + else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage) + processNodeAddFinishedMessage((TcpDiscoveryNodeAddFinishedMessage)msg); + + else if (msg instanceof TcpDiscoveryNodeLeftMessage) + processNodeLeftMessage((TcpDiscoveryNodeLeftMessage)msg); + + else if (msg instanceof TcpDiscoveryNodeFailedMessage) + processNodeFailedMessage((TcpDiscoveryNodeFailedMessage)msg); + + else if (msg instanceof TcpDiscoveryClientHeartbeatMessage) + processClientHeartbeatMessage((TcpDiscoveryClientHeartbeatMessage)msg); + + else if (msg instanceof TcpDiscoveryHeartbeatMessage) + processHeartbeatMessage((TcpDiscoveryHeartbeatMessage)msg); + + else if (msg instanceof TcpDiscoveryStatusCheckMessage) + processStatusCheckMessage((TcpDiscoveryStatusCheckMessage)msg); + + else if (msg instanceof TcpDiscoveryDiscardMessage) + processDiscardMessage((TcpDiscoveryDiscardMessage)msg); + + else if (msg instanceof TcpDiscoveryCustomEventMessage) + processCustomMessage((TcpDiscoveryCustomEventMessage)msg); + + else if (msg instanceof TcpDiscoveryClientPingRequest) + processClientPingRequest((TcpDiscoveryClientPingRequest)msg); + + else + assert false : "Unknown message type: " + msg.getClass().getSimpleName(); + + adapter.stats.onMessageProcessingFinished(msg); + } + + /** + * Sends message across the ring. + * + * @param msg Message to send + */ + @SuppressWarnings({"BreakStatementWithLabel", "LabeledStatement", "ContinueStatementWithLabel"}) + private void sendMessageAcrossRing(TcpDiscoveryAbstractMessage msg) { + assert msg != null; + + assert ring.hasRemoteNodes(); + + for (IgniteInClosure<TcpDiscoveryAbstractMessage> msgLsnr : adapter.sendMsgLsnrs) + msgLsnr.apply(msg); + + if (redirectToClients(msg)) { + byte[] marshalledMsg = null; + + for (ClientMessageWorker clientMsgWorker : clientMsgWorkers.values()) { + // Send a clone to client to avoid ConcurrentModificationException + TcpDiscoveryAbstractMessage msgClone; + + try { + if (marshalledMsg == null) + marshalledMsg = adapter.marsh.marshal(msg); + + msgClone = adapter.marsh.unmarshal(marshalledMsg, null); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to marshal message: " + msg, e); + + msgClone = msg; + } + + clientMsgWorker.addMessage(msgClone); + } + } + + Collection<TcpDiscoveryNode> failedNodes; + + TcpDiscoverySpiState state; + + synchronized (mux) { + failedNodes = U.arrayList(ServerImpl.this.failedNodes); + + state = spiState; + } + + Collection<Throwable> errs = null; + + boolean sent = false; + + boolean searchNext = true; + + UUID locNodeId = getLocalNodeId(); + + while (true) { + if (searchNext) { + TcpDiscoveryNode newNext = ring.nextNode(failedNodes); + + if (newNext == null) { + if (log.isDebugEnabled()) + log.debug("No next node in topology."); + + if (debugMode) + debugLog("No next node in topology."); + + if (ring.hasRemoteNodes()) { + msg.senderNodeId(locNodeId); + + addMessage(msg); + } + + break; + } + + if (!newNext.equals(next)) { + if (log.isDebugEnabled()) + log.debug("New next node [newNext=" + newNext + ", formerNext=" + next + + ", ring=" + ring + ", failedNodes=" + failedNodes + ']'); + + if (debugMode) + debugLog("New next node [newNext=" + newNext + ", formerNext=" + next + + ", ring=" + ring + ", failedNodes=" + failedNodes + ']'); + + U.closeQuiet(sock); + + sock = null; + + next = newNext; + } + else if (log.isDebugEnabled()) + log.debug("Next node remains the same [nextId=" + next.id() + + ", nextOrder=" + next.internalOrder() + ']'); + } + + // Flag that shows whether next node exists and accepts incoming connections. + boolean nextNodeExists = sock != null; + + final boolean sameHost = U.sameMacs(locNode, next); + + List<InetSocketAddress> localNodeAddresses = U.arrayList(locNode.socketAddresses()); + + addr: for (InetSocketAddress addr : adapter.getNodeAddresses(next, sameHost)) { + long ackTimeout0 = adapter.ackTimeout; + + if (localNodeAddresses.contains(addr)){ + if (log.isDebugEnabled()) + log.debug("Skip to send message to the local node (probably remote node has the same " + + "loopback address that local node): " + addr); + + continue; + } + + for (int i = 0; i < adapter.reconCnt; i++) { + if (sock == null) { + nextNodeExists = false; + + boolean success = false; + + boolean openSock = false; + + // Restore ring. + try { + long tstamp = U.currentTimeMillis(); + + sock = adapter.openSocket(addr); + + openSock = true; + + // Handshake. + writeToSocket(sock, new TcpDiscoveryHandshakeRequest(locNodeId)); + + TcpDiscoveryHandshakeResponse res = adapter.readMessage(sock, null, ackTimeout0); + + if (locNodeId.equals(res.creatorNodeId())) { + if (log.isDebugEnabled()) + log.debug("Handshake response from local node: " + res); + + U.closeQuiet(sock); + + sock = null; + + break; + } + + adapter.stats.onClientSocketInitialized(U.currentTimeMillis() - tstamp); + + UUID nextId = res.creatorNodeId(); + + long nextOrder = res.order(); + + if (!next.id().equals(nextId)) { + // Node with different ID has bounded to the same port. + if (log.isDebugEnabled()) + log.debug("Failed to restore ring because next node ID received is not as " + + "expected [expectedId=" + next.id() + ", rcvdId=" + nextId + ']'); + + if (debugMode) + debugLog("Failed to restore ring because next node ID received is not as " + + "expected [expectedId=" + next.id() + ", rcvdId=" + nextId + ']'); + + break; + } + else { + // ID is as expected. Check node order. + if (nextOrder != next.internalOrder()) { + // Is next currently being added? + boolean nextNew = (msg instanceof TcpDiscoveryNodeAddedMessage && + ((TcpDiscoveryNodeAddedMessage)msg).node().id().equals(nextId)); + + if (!nextNew) { + if (log.isDebugEnabled()) + log.debug("Failed to restore ring because next node order received " + + "is not as expected [expected=" + next.internalOrder() + + ", rcvd=" + nextOrder + ", id=" + next.id() + ']'); + + if (debugMode) + debugLog("Failed to restore ring because next node order received " + + "is not as expected [expected=" + next.internalOrder() + + ", rcvd=" + nextOrder + ", id=" + next.id() + ']'); + + break; + } + } + + if (log.isDebugEnabled()) + log.debug("Initialized connection with next node: " + next.id()); + + if (debugMode) + debugLog("Initialized connection with next node: " + next.id()); + + errs = null; + + success = true; + } + } + catch (IOException | IgniteCheckedException e) { + if (errs == null) + errs = new ArrayList<>(); + + errs.add(e); + + if (log.isDebugEnabled()) + U.error(log, "Failed to connect to next node [msg=" + msg + + ", err=" + e.getMessage() + ']', e); + + onException("Failed to connect to next node [msg=" + msg + ", err=" + e + ']', e); + + if (!openSock) + break; // Don't retry if we can not establish connection. + + if (e instanceof SocketTimeoutException || + X.hasCause(e, SocketTimeoutException.class)) { + ackTimeout0 *= 2; + + if (!checkAckTimeout(ackTimeout0)) + break; + } + + continue; + } + finally { + if (!success) { + U.closeQuiet(sock); + + sock = null; + } + else + // Next node exists and accepts incoming messages. + nextNodeExists = true; + } + } + + try { + boolean failure; + + synchronized (mux) { + failure = ServerImpl.this.failedNodes.size() < failedNodes.size(); + } + + assert !forceSndPending || msg instanceof TcpDiscoveryNodeLeftMessage; + + if (failure || forceSndPending) { + if (log.isDebugEnabled()) + log.debug("Pending messages will be sent [failure=" + failure + + ", forceSndPending=" + forceSndPending + ']'); + + if (debugMode) + debugLog("Pending messages will be sent [failure=" + failure + + ", forceSndPending=" + forceSndPending + ']'); + + boolean skip = pendingMsgs.discardId != null; + + for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs.msgs) { + if (skip) { + if (pendingMsg.id().equals(pendingMsgs.discardId)) + skip = false; + + continue; + } + + long tstamp = U.currentTimeMillis(); + + prepareNodeAddedMessage(pendingMsg, next.id(), pendingMsgs.msgs, + pendingMsgs.discardId); + + try { + writeToSocket(sock, pendingMsg); + } + finally { + clearNodeAddedMessage(pendingMsg); + } + + adapter.stats.onMessageSent(pendingMsg, U.currentTimeMillis() - tstamp); + + int res = adapter.readReceipt(sock, ackTimeout0); + + if (log.isDebugEnabled()) + log.debug("Pending message has been sent to next node [msg=" + msg.id() + + ", pendingMsgId=" + pendingMsg + ", next=" + next.id() + + ", res=" + res + ']'); + + if (debugMode) + debugLog("Pending message has been sent to next node [msg=" + msg.id() + + ", pendingMsgId=" + pendingMsg + ", next=" + next.id() + + ", res=" + res + ']'); + } + } + + prepareNodeAddedMessage(msg, next.id(), pendingMsgs.msgs, pendingMsgs.discardId); + + try { + long tstamp = U.currentTimeMillis(); + + writeToSocket(sock, msg); + + adapter.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp); + + int res = adapter.readReceipt(sock, ackTimeout0); + + if (log.isDebugEnabled()) + log.debug("Message has been sent to next node [msg=" + msg + + ", next=" + next.id() + + ", res=" + res + ']'); + + if (debugMode) + debugLog("Message has been sent to next node [msg=" + msg + + ", next=" + next.id() + + ", res=" + res + ']'); + } + finally { + clearNodeAddedMessage(msg); + } + + registerPendingMessage(msg); + + sent = true; + + break addr; + } + catch (IOException | IgniteCheckedException e) { + if (errs == null) + errs = new ArrayList<>(); + + errs.add(e); + + if (log.isDebugEnabled()) + U.error(log, "Failed to send message to next node [next=" + next.id() + ", msg=" + msg + + ", err=" + e + ']', e); + + onException("Failed to send message to next node [next=" + next.id() + ", msg=" + msg + ']', + e); + + if (e instanceof SocketTimeoutException || X.hasCause(e, SocketTimeoutException.class)) { + ackTimeout0 *= 2; + + if (!checkAckTimeout(ackTimeout0)) + break; + } + } + finally { + forceSndPending = false; + + if (!sent) { + U.closeQuiet(sock); + + sock = null; + + if (log.isDebugEnabled()) + log.debug("Message has not been sent [next=" + next.id() + ", msg=" + msg + + ", i=" + i + ']'); + } + } + } // Try to reconnect. + } // Iterating node's addresses. + + if (!sent) { + if (!failedNodes.contains(next)) { + failedNodes.add(next); + + if (state == CONNECTED) { + Exception err = errs != null ? + U.exceptionWithSuppressed("Failed to send message to next node [msg=" + msg + + ", next=" + U.toShortString(next) + ']', errs) : + null; + + // If node existed on connection initialization we should check + // whether it has not gone yet. + if (nextNodeExists && pingNode(next)) + U.error(log, "Failed to send message to next node [msg=" + msg + + ", next=" + next + ']', err); + else if (log.isDebugEnabled()) + log.debug("Failed to send message to next node [msg=" + msg + ", next=" + next + + ", errMsg=" + (err != null ? err.getMessage() : "N/A") + ']'); + } + } + + if (msg instanceof TcpDiscoveryStatusCheckMessage) { + TcpDiscoveryStatusCheckMessage msg0 = (TcpDiscoveryStatusCheckMessage)msg; + + if (next.id().equals(msg0.failedNodeId())) { + next = null; + + if (log.isDebugEnabled()) + log.debug("Discarding status check since next node has indeed failed [next=" + next + + ", msg=" + msg + ']'); + + // Discard status check message by exiting loop and handle failure. + break; + } + } + + next = null; + + searchNext = true; + + errs = null; + } + else + break; + } + + synchronized (mux) { + failedNodes.removeAll(ServerImpl.this.failedNodes); + } + + if (!failedNodes.isEmpty()) { + if (state == CONNECTED) { + if (!sent && log.isDebugEnabled()) + // Message has not been sent due to some problems. + log.debug("Message has not been sent: " + m
<TRUNCATED>