http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ef8f69b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchange.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchange.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchange.java new file mode 100644 index 0000000..955a305 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchange.java @@ -0,0 +1,36 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.apache.ignite.spi.discovery; + +import java.util.*; + +/** + * Handler for initial data exchange between GridGain nodes. Data exchange + * is initiated by a new node when it tries to join topology and finishes + * before it actually joins. + */ +public interface DiscoverySpiDataExchange { + /** + * Collects data from all components. This method is called both + * on new node that joins topology to transfer its data to existing + * nodes and on all existing nodes to transfer their data to new node. + * + * @param nodeId ID of new node that joins topology. + * @return Collection of discovery data objects from different components. + */ + public List<Object> collect(UUID nodeId); + + /** + * Notifies discovery manager about data received from remote node. + * + * @param data Collection of discovery data objects from different components. + */ + public void onExchange(List<Object> data); +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ef8f69b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiHistorySupport.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiHistorySupport.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiHistorySupport.java new file mode 100644 index 0000000..cd6ecf9 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiHistorySupport.java @@ -0,0 +1,28 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.apache.ignite.spi.discovery; + +import java.lang.annotation.*; + +/** + * This annotation is for all implementations of {@link DiscoverySpi} that support + * topology snapshots history. + */ +@Documented +@Inherited +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.TYPE}) +public @interface DiscoverySpiHistorySupport { + /** + * Whether or not target SPI supports topology snapshots history. + */ + @SuppressWarnings({"JavaDoc"}) + public boolean value(); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ef8f69b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiListener.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiListener.java new file mode 100644 index 0000000..acc0ee1 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiListener.java @@ -0,0 +1,35 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.apache.ignite.spi.discovery; + +import org.apache.ignite.cluster.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +/** + * Listener for grid node discovery events. See + * {@link DiscoverySpi} for information on how grid nodes get discovered. + */ +public interface DiscoverySpiListener { + /** + * Notification for grid node discovery events. + * + * @param type Node discovery event type. See {@link org.apache.ignite.events.IgniteDiscoveryEvent} + * @param topVer Topology version or {@code 0} if configured discovery SPI implementation + * does not support versioning. + * @param node Node affected (e.g. newly joined node, left node, failed node or local node). + * @param topSnapshot Topology snapshot after event has been occurred (e.g. if event is + * {@code EVT_NODE_JOINED}, then joined node will be in snapshot). + * @param topHist Topology snapshots history. + */ + public void onDiscovery(int type, long topVer, ClusterNode node, Collection<ClusterNode> topSnapshot, + @Nullable Map<Long, Collection<ClusterNode>> topHist); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ef8f69b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiNodeAuthenticator.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiNodeAuthenticator.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiNodeAuthenticator.java new file mode 100644 index 0000000..dd9fc3a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiNodeAuthenticator.java @@ -0,0 +1,39 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.apache.ignite.spi.discovery; + +import org.apache.ignite.cluster.*; +import org.gridgain.grid.*; +import org.gridgain.grid.kernal.managers.security.*; +import org.gridgain.grid.security.*; + +/** + * Node authenticator. + */ +public interface DiscoverySpiNodeAuthenticator { + /** + * Security credentials. + * + * @param node Node to authenticate. + * @param cred Security credentials. + * @return Security context if authentication succeeded or {@code null} if authentication failed. + * @throws GridException If authentication process failed + * (invalid credentials should not lead to this exception). + */ + public GridSecurityContext authenticateNode(ClusterNode node, GridSecurityCredentials cred) throws GridException; + + /** + * Gets global node authentication flag. + * + * @return {@code True} if all nodes in topology should authenticate joining node, {@code false} if only + * coordinator should do the authentication. + */ + public boolean isGlobalNodeAuthentication(); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ef8f69b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiOrderSupport.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiOrderSupport.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiOrderSupport.java new file mode 100644 index 0000000..aee8b3f --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiOrderSupport.java @@ -0,0 +1,39 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.apache.ignite.spi.discovery; + +import java.lang.annotation.*; + +/** + * This annotation is for all implementations of {@link DiscoverySpi} that support + * proper node ordering. This includes: + * <ul> + * <li> + * Every node gets an order number assigned to it which is provided via {@link org.apache.ignite.cluster.ClusterNode#order()} + * method. There is no requirement about order value other than that nodes that join grid + * at later point of time have order values greater than previous nodes. + * </li> + * <li> + * All {@link org.apache.ignite.events.IgniteEventType#EVT_NODE_JOINED} events come in proper order. This means that all + * listeners to discovery events will receive discovery notifications in proper order. + * </li> + * </ul> + */ +@Documented +@Inherited +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.TYPE}) +public @interface DiscoverySpiOrderSupport { + /** + * Whether or not target SPI supports node startup order. + */ + @SuppressWarnings({"JavaDoc"}) + public boolean value(); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ef8f69b/modules/core/src/main/java/org/apache/ignite/spi/discovery/package.html ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/package.html b/modules/core/src/main/java/org/apache/ignite/spi/discovery/package.html new file mode 100644 index 0000000..77f45f1 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/package.html @@ -0,0 +1,15 @@ +<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> +<!-- + @html.file.header + _________ _____ __________________ _____ + __ ____/___________(_)______ /__ ____/______ ____(_)_______ + _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ +--> +<html> +<body> + <!-- Package description. --> + Contains APIs for topology manager SPI. +</body> +</html> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ef8f69b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java new file mode 100644 index 0000000..b474852 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java @@ -0,0 +1,1219 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.apache.ignite.spi.discovery.tcp; + +import org.apache.ignite.cluster.*; +import org.apache.ignite.events.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.spi.*; +import org.gridgain.grid.*; +import org.apache.ignite.spi.discovery.*; +import org.apache.ignite.spi.discovery.tcp.internal.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.*; +import org.apache.ignite.spi.discovery.tcp.messages.*; +import org.gridgain.grid.util.typedef.*; +import org.gridgain.grid.util.typedef.internal.*; +import org.jdk8.backport.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.net.*; +import java.util.*; +import java.util.concurrent.*; + +import static java.util.concurrent.TimeUnit.*; +import static org.apache.ignite.events.IgniteEventType.*; +import static org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHeartbeatMessage.*; + +/** + * Client discovery SPI implementation that uses TCP/IP for node discovery. + * <p> + * This discovery SPI requires at least on server node configured with + * {@link TcpDiscoverySpi}. It will try to connect to random IP taken from + * {@link org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder} which should point to one of these server + * nodes and will maintain connection only with this node (will not enter the ring). + * If this connection is broken, it will try to reconnect using addresses from + * the same IP finder. + */ +@SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext") +@IgniteSpiMultipleInstancesSupport(true) +@DiscoverySpiOrderSupport(true) +@DiscoverySpiHistorySupport(true) +public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpClientDiscoverySpiMBean { + /** Default disconnect check interval. */ + public static final long DFLT_DISCONNECT_CHECK_INT = 2000; + + /** Remote nodes. */ + private final ConcurrentMap<UUID, TcpDiscoveryNode> rmtNodes = new ConcurrentHashMap8<>(); + + /** Socket. */ + private volatile Socket sock; + + /** Socket reader. */ + private volatile SocketReader sockRdr; + + /** Heartbeat sender. */ + private volatile HeartbeatSender hbSender; + + /** Disconnect handler. */ + private volatile DisconnectHandler disconnectHnd; + + /** Last message ID. */ + private volatile IgniteUuid lastMsgId; + + /** Current topology version. */ + private volatile long topVer; + + /** Join error. */ + private IgniteSpiException joinErr; + + /** Whether reconnect failed. */ + private boolean reconFailed; + + /** Joined latch. */ + private CountDownLatch joinLatch; + + /** Left latch. */ + private volatile CountDownLatch leaveLatch; + + /** Disconnect check interval. */ + private long disconnectCheckInt = DFLT_DISCONNECT_CHECK_INT; + + /** {@inheritDoc} */ + @Override public long getDisconnectCheckInterval() { + return disconnectCheckInt; + } + + /** + * Sets disconnect check interval. + * + * @param disconnectCheckInt Disconnect check interval. + */ + @IgniteSpiConfiguration(optional = true) + public void setDisconnectCheckInterval(long disconnectCheckInt) { + this.disconnectCheckInt = disconnectCheckInt; + } + + /** {@inheritDoc} */ + @Override public long getSocketTimeout() { + return sockTimeout; + } + + /** {@inheritDoc} */ + @Override public long getAckTimeout() { + return ackTimeout; + } + + /** {@inheritDoc} */ + @Override public long getNetworkTimeout() { + return netTimeout; + } + + /** {@inheritDoc} */ + @Override public int getThreadPriority() { + return threadPri; + } + + /** {@inheritDoc} */ + @Override public long getHeartbeatFrequency() { + return hbFreq; + } + + /** {@inheritDoc} */ + @Override public String getIpFinderFormatted() { + return ipFinder.toString(); + } + + /** {@inheritDoc} */ + @Override public int getMessageWorkerQueueSize() { + SocketReader sockRdr0 = sockRdr; + + return sockRdr0 != null ? sockRdr0.msgWrk.queueSize() : 0; + } + + /** {@inheritDoc} */ + @Override public long getNodesJoined() { + return stats.joinedNodesCount(); + } + + /** {@inheritDoc} */ + @Override public long getNodesLeft() { + return stats.leftNodesCount(); + } + + /** {@inheritDoc} */ + @Override public long getNodesFailed() { + return stats.failedNodesCount(); + } + + /** {@inheritDoc} */ + @Override public long getAvgMessageProcessingTime() { + return stats.avgMessageProcessingTime(); + } + + /** {@inheritDoc} */ + @Override public long getMaxMessageProcessingTime() { + return stats.maxMessageProcessingTime(); + } + + /** {@inheritDoc} */ + @Override public int getTotalReceivedMessages() { + return stats.totalReceivedMessages(); + } + + /** {@inheritDoc} */ + @Override public Map<String, Integer> getReceivedMessages() { + return stats.receivedMessages(); + } + + /** {@inheritDoc} */ + @Override public int getTotalProcessedMessages() { + return stats.totalProcessedMessages(); + } + + /** {@inheritDoc} */ + @Override public Map<String, Integer> getProcessedMessages() { + return stats.processedMessages(); + } + + /** {@inheritDoc} */ + @Override public void spiStart(@Nullable String gridName) throws IgniteSpiException { + startStopwatch(); + + assertParameter(ipFinder != null, "ipFinder != null"); + assertParameter(netTimeout > 0, "networkTimeout > 0"); + assertParameter(sockTimeout > 0, "sockTimeout > 0"); + assertParameter(ackTimeout > 0, "ackTimeout > 0"); + assertParameter(hbFreq > 0, "heartbeatFreq > 0"); + assertParameter(threadPri > 0, "threadPri > 0"); + + try { + locHost = U.resolveLocalHost(locAddr); + } + catch (IOException e) { + throw new IgniteSpiException("Unknown local address: " + locAddr, e); + } + + if (log.isDebugEnabled()) { + log.debug(configInfo("localHost", locHost.getHostAddress())); + log.debug(configInfo("threadPri", threadPri)); + log.debug(configInfo("networkTimeout", netTimeout)); + log.debug(configInfo("sockTimeout", sockTimeout)); + log.debug(configInfo("ackTimeout", ackTimeout)); + log.debug(configInfo("ipFinder", ipFinder)); + log.debug(configInfo("heartbeatFreq", hbFreq)); + } + + // Warn on odd network timeout. + if (netTimeout < 3000) + U.warn(log, "Network timeout is too low (at least 3000 ms recommended): " + netTimeout); + + // Warn on odd heartbeat frequency. + if (hbFreq < 2000) + U.warn(log, "Heartbeat frequency is too high (at least 2000 ms recommended): " + hbFreq); + + registerMBean(gridName, this, TcpClientDiscoverySpiMBean.class); + + try { + locHost = U.resolveLocalHost(locAddr); + } + catch (IOException e) { + throw new IgniteSpiException("Unknown local address: " + locAddr, e); + } + + if (ipFinder instanceof TcpDiscoveryMulticastIpFinder) { + TcpDiscoveryMulticastIpFinder mcastIpFinder = ((TcpDiscoveryMulticastIpFinder)ipFinder); + + if (mcastIpFinder.getLocalAddress() == null) + mcastIpFinder.setLocalAddress(locAddr); + } + + IgniteBiTuple<Collection<String>, Collection<String>> addrs; + + try { + addrs = U.resolveLocalAddresses(locHost); + } + catch (IOException | GridException e) { + throw new IgniteSpiException("Failed to resolve local host to set of external addresses: " + locHost, e); + } + + locNode = new TcpDiscoveryNode( + locNodeId, + addrs.get1(), + addrs.get2(), + 0, + metricsProvider, + locNodeVer); + + locNode.setAttributes(locNodeAttrs); + locNode.local(true); + + sockTimeoutWorker = new SocketTimeoutWorker(); + sockTimeoutWorker.start(); + + joinTopology(false); + + disconnectHnd = new DisconnectHandler(); + disconnectHnd.start(); + + if (log.isDebugEnabled()) + log.debug(startInfo()); + } + + /** {@inheritDoc} */ + @Override public void spiStop() throws IgniteSpiException { + rmtNodes.clear(); + + U.interrupt(disconnectHnd); + U.join(disconnectHnd, log); + + U.interrupt(hbSender); + U.join(hbSender, log); + + Socket sock0 = sock; + + sock = null; + + if (sock0 != null) { + leaveLatch = new CountDownLatch(1); + + try { + TcpDiscoveryNodeLeftMessage msg = new TcpDiscoveryNodeLeftMessage(locNodeId); + + msg.client(true); + + writeToSocket(sock0, msg); + + if (!U.await(leaveLatch, netTimeout, MILLISECONDS)) { + if (log.isDebugEnabled()) + log.debug("Did not receive node left message for local node (will stop anyway) [sock=" + + sock0 + ']'); + } + } + catch (IOException | GridException e) { + if (log.isDebugEnabled()) + U.error(log, "Failed to send node left message (will stop anyway) [sock=" + sock0 + ']', e); + } + finally { + U.closeQuiet(sock0); + } + } + + U.interrupt(sockRdr); + U.join(sockRdr, log); + + U.interrupt(sockTimeoutWorker); + U.join(sockTimeoutWorker, log); + + unregisterMBean(); + + if (log.isDebugEnabled()) + log.debug(stopInfo()); + } + + /** {@inheritDoc} */ + @Override public Collection<Object> injectables() { + return Arrays.<Object>asList(ipFinder); + } + + /** {@inheritDoc} */ + @Override public Collection<ClusterNode> getRemoteNodes() { + return F.view(U.<TcpDiscoveryNode, ClusterNode>arrayList(rmtNodes.values(), new P1<TcpDiscoveryNode>() { + @Override public boolean apply(TcpDiscoveryNode node) { + return node.visible(); + } + })); + } + + /** {@inheritDoc} */ + @Nullable @Override public ClusterNode getNode(UUID nodeId) { + if (locNodeId.equals(nodeId)) + return locNode; + + TcpDiscoveryNode node = rmtNodes.get(nodeId); + + return node != null && node.visible() ? node : null; + } + + /** {@inheritDoc} */ + @Override public boolean pingNode(UUID nodeId) { + assert nodeId != null; + + if (nodeId.equals(locNodeId)) + return true; + + TcpDiscoveryNode node = rmtNodes.get(nodeId); + + return node != null && node.visible(); + } + + /** {@inheritDoc} */ + @Override public void disconnect() throws IgniteSpiException { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public void setAuthenticator(DiscoverySpiNodeAuthenticator auth) { + // No-op. + } + + /** + * @param recon Reconnect flag. + * @return Whether joined successfully. + * @throws org.apache.ignite.spi.IgniteSpiException In case of error. + */ + private boolean joinTopology(boolean recon) throws IgniteSpiException { + if (!recon) + stats.onJoinStarted(); + + Collection<InetSocketAddress> addrs = null; + + while (!Thread.currentThread().isInterrupted()) { + try { + while (addrs == null || addrs.isEmpty()) { + addrs = resolvedAddresses(); + + if (!F.isEmpty(addrs)) { + if (log.isDebugEnabled()) + log.debug("Resolved addresses from IP finder: " + addrs); + } + else { + U.warn(log, "No addresses registered in the IP finder (will retry in 2000ms): " + ipFinder); + + U.sleep(2000); + } + } + + Iterator<InetSocketAddress> it = addrs.iterator(); + + while (it.hasNext() && !Thread.currentThread().isInterrupted()) { + InetSocketAddress addr = it.next(); + + Socket sock = null; + + try { + long ts = U.currentTimeMillis(); + + IgniteBiTuple<Socket, UUID> t = initConnection(addr); + + sock = t.get1(); + + UUID rmtNodeId = t.get2(); + + stats.onClientSocketInitialized(U.currentTimeMillis() - ts); + + locNode.clientRouterNodeId(rmtNodeId); + + TcpDiscoveryAbstractMessage msg = recon ? + new TcpDiscoveryClientReconnectMessage(locNodeId, rmtNodeId, lastMsgId) : + new TcpDiscoveryJoinRequestMessage(locNode, null); + + msg.client(true); + + writeToSocket(sock, msg); + + int res = readReceipt(sock, ackTimeout); + + switch (res) { + case RES_OK: + this.sock = sock; + + sockRdr = new SocketReader(rmtNodeId, new MessageWorker(recon)); + sockRdr.start(); + + if (U.await(joinLatch, netTimeout, MILLISECONDS)) { + IgniteSpiException joinErr0 = joinErr; + + if (joinErr0 != null) + throw joinErr0; + + if (reconFailed) { + if (log.isDebugEnabled()) + log.debug("Failed to reconnect, will try to rejoin [locNode=" + + locNode + ']'); + + U.closeQuiet(sock); + + U.interrupt(sockRdr); + U.join(sockRdr, log); + + this.sock = null; + + return false; + } + + if (log.isDebugEnabled()) + log.debug("Successfully connected to topology [sock=" + sock + ']'); + + hbSender = new HeartbeatSender(); + hbSender.start(); + + stats.onJoinFinished(); + + return true; + } + else { + U.warn(log, "Join process timed out (will try other address) [sock=" + sock + + ", timeout=" + netTimeout + ']'); + + U.closeQuiet(sock); + + U.interrupt(sockRdr); + U.join(sockRdr, log); + + it.remove(); + + break; + } + + case RES_CONTINUE_JOIN: + case RES_WAIT: + U.closeQuiet(sock); + + break; + + default: + if (log.isDebugEnabled()) + log.debug("Received unexpected response to join request: " + res); + + U.closeQuiet(sock); + } + } + catch (GridInterruptedException ignored) { + if (log.isDebugEnabled()) + log.debug("Joining thread was interrupted."); + + return false; + } + catch (IOException | GridException e) { + if (log.isDebugEnabled()) + U.error(log, "Failed to establish connection with address: " + addr, e); + + U.closeQuiet(sock); + + it.remove(); + } + } + + if (addrs.isEmpty()) { + U.warn(log, "Failed to connect to any address from IP finder (will retry to join topology " + + "in 2000ms): " + addrs); + + U.sleep(2000); + } + } + catch (GridInterruptedException ignored) { + if (log.isDebugEnabled()) + log.debug("Joining thread was interrupted."); + } + } + + return false; + } + + /** + * @param addr Address. + * @return Remote node ID. + * @throws IOException In case of I/O error. + * @throws GridException In case of other error. + */ + private IgniteBiTuple<Socket, UUID> initConnection(InetSocketAddress addr) throws IOException, GridException { + assert addr != null; + + joinLatch = new CountDownLatch(1); + + Socket sock = openSocket(addr); + + TcpDiscoveryHandshakeRequest req = new TcpDiscoveryHandshakeRequest(locNodeId); + + req.client(true); + + writeToSocket(sock, req); + + TcpDiscoveryHandshakeResponse res = readMessage(sock, null, ackTimeout); + + UUID nodeId = res.creatorNodeId(); + + assert nodeId != null; + assert !locNodeId.equals(nodeId); + + return F.t(sock, nodeId); + } + + /** + * FOR TEST PURPOSE ONLY! + */ + void simulateNodeFailure() { + U.warn(log, "Simulating client node failure: " + locNodeId); + + U.closeQuiet(sock); + + U.interrupt(disconnectHnd); + U.join(disconnectHnd, log); + + U.interrupt(hbSender); + U.join(hbSender, log); + + U.interrupt(sockRdr); + U.join(sockRdr, log); + + U.interrupt(sockTimeoutWorker); + U.join(sockTimeoutWorker, log); + } + + /** + * Disconnect handler. + */ + private class DisconnectHandler extends IgniteSpiThread { + /** + */ + protected DisconnectHandler() { + super(gridName, "tcp-client-disco-disconnect-hnd", log); + } + + /** {@inheritDoc} */ + @Override protected void body() throws InterruptedException { + while (!isInterrupted()) { + try { + U.sleep(disconnectCheckInt); + + if (sock == null) { + if (log.isDebugEnabled()) + log.debug("Node is disconnected from topology, will try to reconnect."); + + U.interrupt(hbSender); + U.join(hbSender, log); + + U.interrupt(sockRdr); + U.join(sockRdr, log); + + // If reconnection fails, try to rejoin. + if (!joinTopology(true)) { + rmtNodes.clear(); + + locNode.order(0); + + joinTopology(false); + + getSpiContext().recordEvent(new IgniteDiscoveryEvent(locNode, + "Client node reconnected: " + locNode, + EVT_CLIENT_NODE_RECONNECTED, locNode)); + } + } + } + catch (GridInterruptedException ignored) { + if (log.isDebugEnabled()) + log.debug("Disconnect handler was interrupted."); + + return; + } + catch (IgniteSpiException e) { + U.error(log, "Failed to reconnect to topology after failure.", e); + } + } + } + } + + /** + * Heartbeat sender. + */ + private class HeartbeatSender extends IgniteSpiThread { + /** + */ + protected HeartbeatSender() { + super(gridName, "tcp-client-disco-heartbeat-sender", log); + } + + /** {@inheritDoc} */ + @Override protected void body() throws InterruptedException { + Socket sock0 = sock; + + if (sock0 == null) { + if (log.isDebugEnabled()) + log.debug("Failed to start heartbeat sender, node is already disconnected."); + + return; + } + + try { + while (!isInterrupted()) { + U.sleep(hbFreq); + + TcpDiscoveryHeartbeatMessage msg = new TcpDiscoveryHeartbeatMessage(locNodeId); + + msg.client(true); + + sockRdr.addMessage(msg); + } + } + catch (GridInterruptedException ignored) { + if (log.isDebugEnabled()) + log.debug("Heartbeat sender was interrupted."); + } + } + } + + /** + * Socket reader. + */ + private class SocketReader extends IgniteSpiThread { + /** Remote node ID. */ + private final UUID nodeId; + + /** Message worker. */ + private final MessageWorker msgWrk; + + /** + * @param nodeId Node ID. + * @param msgWrk Message worker. + */ + protected SocketReader(UUID nodeId, MessageWorker msgWrk) { + super(gridName, "tcp-client-disco-sock-reader", log); + + assert nodeId != null; + assert msgWrk != null; + + this.nodeId = nodeId; + this.msgWrk = msgWrk; + } + + /** {@inheritDoc} */ + @Override public synchronized void start() { + super.start(); + + msgWrk.start(); + } + + /** {@inheritDoc} */ + @Override protected void body() throws InterruptedException { + Socket sock0 = sock; + + if (sock0 == null) { + if (log.isDebugEnabled()) + log.debug("Failed to start socket reader, node is already disconnected."); + + return; + } + + try { + InputStream in = new BufferedInputStream(sock0.getInputStream()); + + sock0.setKeepAlive(true); + sock0.setTcpNoDelay(true); + + while (!isInterrupted()) { + try { + TcpDiscoveryAbstractMessage msg = marsh.unmarshal(in, U.gridClassLoader()); + + msg.senderNodeId(nodeId); + + if (log.isDebugEnabled()) + log.debug("Message has been received: " + msg); + + stats.onMessageReceived(msg); + + IgniteSpiException err = null; + + if (joinLatch.getCount() > 0) { + if (msg instanceof TcpDiscoveryDuplicateIdMessage) + err = duplicateIdError((TcpDiscoveryDuplicateIdMessage)msg); + else if (msg instanceof TcpDiscoveryAuthFailedMessage) + err = authenticationFailedError((TcpDiscoveryAuthFailedMessage)msg); + else if (msg instanceof TcpDiscoveryCheckFailedMessage) + err = checkFailedError((TcpDiscoveryCheckFailedMessage)msg); + + if (err != null) { + joinErr = err; + + joinLatch.countDown(); + + return; + } + } + + msgWrk.addMessage(msg); + } + catch (GridException e) { + if (log.isDebugEnabled()) + U.error(log, "Failed to read message [sock=" + sock0 + ", locNodeId=" + locNodeId + + ", rmtNodeId=" + nodeId + ']', e); + + IOException ioEx = X.cause(e, IOException.class); + + if (ioEx != null) + throw ioEx; + + ClassNotFoundException clsNotFoundEx = X.cause(e, ClassNotFoundException.class); + + if (clsNotFoundEx != null) + LT.warn(log, null, "Failed to read message due to ClassNotFoundException " + + "(make sure same versions of all classes are available on all nodes) " + + "[rmtNodeId=" + nodeId + ", err=" + clsNotFoundEx.getMessage() + ']'); + else + LT.error(log, e, "Failed to read message [sock=" + sock0 + ", locNodeId=" + locNodeId + + ", rmtNodeId=" + nodeId + ']'); + } + } + } + catch (IOException e) { + if (log.isDebugEnabled()) + U.error(log, "Connection failed [sock=" + sock0 + ", locNodeId=" + locNodeId + + ", rmtNodeId=" + nodeId + ']', e); + } + finally { + U.closeQuiet(sock0); + + U.interrupt(msgWrk); + + try { + U.join(msgWrk); + } + catch (GridInterruptedException ignored) { + // No-op. + } + + sock = null; + } + } + + /** + * @param msg Message. + */ + void addMessage(TcpDiscoveryAbstractMessage msg) { + assert msg != null; + + msgWrk.addMessage(msg); + } + } + + /** + * Message worker. + */ + private class MessageWorker extends MessageWorkerAdapter { + /** Topology history. */ + private final NavigableMap<Long, Collection<ClusterNode>> topHist = new TreeMap<>(); + + /** Indicates that reconnection is in progress. */ + private boolean recon; + + /** Indicates that pending messages are currently processed. */ + private boolean pending; + + /** + * @param recon Whether reconnection is in progress. + */ + protected MessageWorker(boolean recon) { + super("tcp-client-disco-msg-worker"); + + this.recon = recon; + } + + /** {@inheritDoc} */ + @Override protected void processMessage(TcpDiscoveryAbstractMessage msg) { + assert msg != null; + assert msg.verified() || msg.senderNodeId() == null; + + stats.onMessageProcessingStarted(msg); + + if (msg instanceof TcpDiscoveryClientReconnectMessage) + processClientReconnectMessage((TcpDiscoveryClientReconnectMessage)msg); + else { + if (recon && !pending) { + if (log.isDebugEnabled()) + log.debug("Discarding message received during reconnection: " + msg); + } + else { + if (msg instanceof TcpDiscoveryNodeAddedMessage) + processNodeAddedMessage((TcpDiscoveryNodeAddedMessage)msg); + else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage) + processNodeAddFinishedMessage((TcpDiscoveryNodeAddFinishedMessage)msg); + else if (msg instanceof TcpDiscoveryNodeLeftMessage) + processNodeLeftMessage((TcpDiscoveryNodeLeftMessage)msg); + else if (msg instanceof TcpDiscoveryNodeFailedMessage) + processNodeFailedMessage((TcpDiscoveryNodeFailedMessage)msg); + else if (msg instanceof TcpDiscoveryHeartbeatMessage) + processHeartbeatMessage((TcpDiscoveryHeartbeatMessage)msg); + + if (ensured(msg)) + lastMsgId = msg.id(); + } + } + + stats.onMessageProcessingFinished(msg); + } + + /** + * @param msg Message. + */ + private void processNodeAddedMessage(TcpDiscoveryNodeAddedMessage msg) { + if (leaveLatch != null) + return; + + TcpDiscoveryNode node = msg.node(); + + UUID newNodeId = node.id(); + + if (locNodeId.equals(newNodeId)) { + if (joinLatch.getCount() > 0) { + Collection<TcpDiscoveryNode> top = msg.topology(); + + if (top != null) { + for (TcpDiscoveryNode n : top) { + if (n.order() > 0) + n.visible(true); + + rmtNodes.put(n.id(), n); + } + + topHist.clear(); + + if (msg.topologyHistory() != null) + topHist.putAll(msg.topologyHistory()); + + Collection<List<Object>> dataList = msg.oldNodesDiscoveryData(); + + if (dataList != null) { + for (List<Object> discoData : dataList) + exchange.onExchange(discoData); + } + + locNode.setAttributes(node.attributes()); + locNode.visible(true); + } + else if (log.isDebugEnabled()) + log.debug("Discarding node added message with empty topology: " + msg); + } + else if (log.isDebugEnabled()) + log.debug("Discarding node added message (this message has already been processed) " + + "[msg=" + msg + ", locNode=" + locNode + ']'); + } + else { + boolean topChanged = rmtNodes.putIfAbsent(newNodeId, node) == null; + + if (topChanged) { + if (log.isDebugEnabled()) + log.debug("Added new node to topology: " + node); + + List<Object> data = msg.newNodeDiscoveryData(); + + if (data != null) + exchange.onExchange(data); + } + } + } + + /** + * @param msg Message. + */ + private void processNodeAddFinishedMessage(TcpDiscoveryNodeAddFinishedMessage msg) { + if (leaveLatch != null) + return; + + if (locNodeId.equals(msg.nodeId())) { + if (joinLatch.getCount() > 0) { + long topVer = msg.topologyVersion(); + + locNode.order(topVer); + + notifyDiscovery(EVT_NODE_JOINED, topVer, locNode, updateTopologyHistory(topVer)); + + joinErr = null; + + joinLatch.countDown(); + } + else if (log.isDebugEnabled()) + log.debug("Discarding node add finished message (this message has already been processed) " + + "[msg=" + msg + ", locNode=" + locNode + ']'); + } + else { + TcpDiscoveryNode node = rmtNodes.get(msg.nodeId()); + + if (node == null) { + if (log.isDebugEnabled()) + log.debug("Discarding node add finished message since node is not found [msg=" + msg + ']'); + + return; + } + + long topVer = msg.topologyVersion(); + + node.order(topVer); + node.visible(true); + + if (locNodeVer.equals(node.version())) + node.version(locNodeVer); + + Collection<ClusterNode> top = updateTopologyHistory(topVer); + + if (!pending && joinLatch.getCount() > 0) { + if (log.isDebugEnabled()) + log.debug("Discarding node add finished message (join process is not finished): " + msg); + + return; + } + + notifyDiscovery(EVT_NODE_JOINED, topVer, node, top); + + stats.onNodeJoined(); + } + } + + /** + * @param msg Message. + */ + private void processNodeLeftMessage(TcpDiscoveryNodeLeftMessage msg) { + if (locNodeId.equals(msg.creatorNodeId())) { + if (log.isDebugEnabled()) + log.debug("Received node left message for local node: " + msg); + + CountDownLatch leaveLatch0 = leaveLatch; + + assert leaveLatch0 != null; + + leaveLatch0.countDown(); + } + else { + if (leaveLatch != null) + return; + + TcpDiscoveryNode node = rmtNodes.remove(msg.creatorNodeId()); + + if (node == null) { + if (log.isDebugEnabled()) + log.debug("Discarding node left message since node is not found [msg=" + msg + ']'); + + return; + } + + Collection<ClusterNode> top = updateTopologyHistory(msg.topologyVersion()); + + if (!pending && joinLatch.getCount() > 0) { + if (log.isDebugEnabled()) + log.debug("Discarding node left message (join process is not finished): " + msg); + + return; + } + + notifyDiscovery(EVT_NODE_LEFT, msg.topologyVersion(), node, top); + + stats.onNodeLeft(); + } + } + + /** + * @param msg Message. + */ + private void processNodeFailedMessage(TcpDiscoveryNodeFailedMessage msg) { + if (leaveLatch != null) + return; + + if (!locNodeId.equals(msg.creatorNodeId())) { + TcpDiscoveryNode node = rmtNodes.remove(msg.failedNodeId()); + + if (node == null) { + if (log.isDebugEnabled()) + log.debug("Discarding node failed message since node is not found [msg=" + msg + ']'); + + return; + } + + Collection<ClusterNode> top = updateTopologyHistory(msg.topologyVersion()); + + if (!pending && joinLatch.getCount() > 0) { + if (log.isDebugEnabled()) + log.debug("Discarding node failed message (join process is not finished): " + msg); + + return; + } + + notifyDiscovery(EVT_NODE_FAILED, msg.topologyVersion(), node, top); + + stats.onNodeFailed(); + } + } + + /** + * @param msg Message. + */ + private void processHeartbeatMessage(TcpDiscoveryHeartbeatMessage msg) { + if (leaveLatch != null) + return; + + if (locNodeId.equals(msg.creatorNodeId())) { + if (msg.senderNodeId() == null) { + Socket sock0 = sock; + + if (sock0 != null) { + msg.setMetrics(locNodeId, metricsProvider.getMetrics()); + + try { + writeToSocket(sock0, msg); + + if (log.isDebugEnabled()) + log.debug("Heartbeat message sent [sock=" + sock0 + ", msg=" + msg + ']'); + } + catch (IOException | GridException e) { + if (log.isDebugEnabled()) + U.error(log, "Failed to send heartbeat message [sock=" + sock0 + + ", msg=" + msg + ']', e); + + U.closeQuiet(sock0); + + sock = null; + + interrupt(); + } + } + else if (log.isDebugEnabled()) + log.debug("Failed to send heartbeat message (node is disconnected): " + msg); + } + else if (log.isDebugEnabled()) + log.debug("Received heartbeat response: " + msg); + } + else { + if (msg.hasMetrics()) { + long tstamp = U.currentTimeMillis(); + + for (Map.Entry<UUID, MetricsSet> e : msg.metrics().entrySet()) { + MetricsSet metricsSet = e.getValue(); + + updateMetrics(e.getKey(), metricsSet.metrics(), tstamp); + + for (T2<UUID, ClusterNodeMetrics> t : metricsSet.clientMetrics()) + updateMetrics(t.get1(), t.get2(), tstamp); + } + } + } + } + + /** + * @param msg Message. + */ + private void processClientReconnectMessage(TcpDiscoveryClientReconnectMessage msg) { + if (leaveLatch != null) + return; + + if (locNodeId.equals(msg.creatorNodeId())) { + if (msg.success()) { + pending = true; + + try { + for (TcpDiscoveryAbstractMessage pendingMsg : msg.pendingMessages()) + processMessage(pendingMsg); + } + finally { + pending = false; + } + + joinErr = null; + reconFailed = false; + + joinLatch.countDown(); + } + else { + joinErr = null; + reconFailed = true; + + getSpiContext().recordEvent(new IgniteDiscoveryEvent(locNode, + "Client node disconnected: " + locNode, + EVT_CLIENT_NODE_DISCONNECTED, locNode)); + + joinLatch.countDown(); + } + } + else if (log.isDebugEnabled()) + log.debug("Discarding reconnect message for another client: " + msg); + } + + /** + * @param nodeId Node ID. + * @param metrics Metrics. + * @param tstamp Timestamp. + */ + private void updateMetrics(UUID nodeId, ClusterNodeMetrics metrics, long tstamp) { + assert nodeId != null; + assert metrics != null; + + TcpDiscoveryNode node = nodeId.equals(locNodeId) ? locNode : rmtNodes.get(nodeId); + + if (node != null && node.visible()) { + node.setMetrics(metrics); + + node.lastUpdateTime(tstamp); + + notifyDiscovery(EVT_NODE_METRICS_UPDATED, topVer, node, allNodes()); + } + else if (log.isDebugEnabled()) + log.debug("Received metrics from unknown node: " + nodeId); + } + + /** + * @param topVer New topology version. + * @return Latest topology snapshot. + */ + private Collection<ClusterNode> updateTopologyHistory(long topVer) { + TcpClientDiscoverySpi.this.topVer = topVer; + + Collection<ClusterNode> allNodes = allNodes(); + + if (!topHist.containsKey(topVer)) { + assert topHist.isEmpty() || topHist.lastKey() == topVer - 1 : + "lastVer=" + topHist.lastKey() + ", newVer=" + topVer; + + topHist.put(topVer, allNodes); + + if (topHist.size() > topHistSize) + topHist.pollFirstEntry(); + + assert topHist.lastKey() == topVer; + assert topHist.size() <= topHistSize; + } + + return allNodes; + } + + /** + * @return All nodes. + */ + private Collection<ClusterNode> allNodes() { + Collection<ClusterNode> allNodes = new TreeSet<>(); + + for (TcpDiscoveryNode node : rmtNodes.values()) { + if (node.visible()) + allNodes.add(node); + } + + allNodes.add(locNode); + + return allNodes; + } + + /** + * @param type Event type. + * @param topVer Topology version. + * @param node Node. + * @param top Topology snapshot. + */ + private void notifyDiscovery(int type, long topVer, ClusterNode node, Collection<ClusterNode> top) { + DiscoverySpiListener lsnr = TcpClientDiscoverySpi.this.lsnr; + + if (lsnr != null) { + if (log.isDebugEnabled()) + log.debug("Discovery notification [node=" + node + ", type=" + U.gridEventName(type) + + ", topVer=" + topVer + ']'); + + lsnr.onDiscovery(type, topVer, node, top, new TreeMap<>(topHist)); + } + else if (log.isDebugEnabled()) + log.debug("Skipped discovery notification [node=" + node + ", type=" + U.gridEventName(type) + + ", topVer=" + topVer + ']'); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ef8f69b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiMBean.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiMBean.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiMBean.java new file mode 100644 index 0000000..95c7fb9 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiMBean.java @@ -0,0 +1,156 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.apache.ignite.spi.discovery.tcp; + +import org.apache.ignite.mbean.*; +import org.apache.ignite.spi.*; + +import java.util.*; + +/** + * Management bean for {@link TcpClientDiscoverySpi}. + */ +public interface TcpClientDiscoverySpiMBean extends IgniteSpiManagementMBean { + /** + * Gets disconnect check interval. + * + * @return Disconnect check interval. + */ + @IgniteMBeanDescription("Disconnect check interval.") + public long getDisconnectCheckInterval(); + + /** + * Gets socket timeout. + * + * @return Socket timeout. + */ + @IgniteMBeanDescription("Socket timeout.") + public long getSocketTimeout(); + + /** + * Gets message acknowledgement timeout. + * + * @return Message acknowledgement timeout. + */ + @IgniteMBeanDescription("Message acknowledgement timeout.") + public long getAckTimeout(); + + /** + * Gets network timeout. + * + * @return Network timeout. + */ + @IgniteMBeanDescription("Network timeout.") + public long getNetworkTimeout(); + + /** + * Gets thread priority. All threads within SPI will be started with it. + * + * @return Thread priority. + */ + @IgniteMBeanDescription("Threads priority.") + public int getThreadPriority(); + + /** + * Gets delay between heartbeat messages sent by coordinator. + * + * @return Time period in milliseconds. + */ + @IgniteMBeanDescription("Heartbeat frequency.") + public long getHeartbeatFrequency(); + + /** + * Gets {@link org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder} (string representation). + * + * @return IPFinder (string representation). + */ + @IgniteMBeanDescription("IP Finder.") + public String getIpFinderFormatted(); + + /** + * Gets message worker queue current size. + * + * @return Message worker queue current size. + */ + @IgniteMBeanDescription("Message worker queue current size.") + public int getMessageWorkerQueueSize(); + + /** + * Gets joined nodes count. + * + * @return Nodes joined count. + */ + @IgniteMBeanDescription("Nodes joined count.") + public long getNodesJoined(); + + /** + * Gets left nodes count. + * + * @return Left nodes count. + */ + @IgniteMBeanDescription("Nodes left count.") + public long getNodesLeft(); + + /** + * Gets failed nodes count. + * + * @return Failed nodes count. + */ + @IgniteMBeanDescription("Nodes failed count.") + public long getNodesFailed(); + + /** + * Gets avg message processing time. + * + * @return Avg message processing time. + */ + @IgniteMBeanDescription("Avg message processing time.") + public long getAvgMessageProcessingTime(); + + /** + * Gets max message processing time. + * + * @return Max message processing time. + */ + @IgniteMBeanDescription("Max message processing time.") + public long getMaxMessageProcessingTime(); + + /** + * Gets total received messages count. + * + * @return Total received messages count. + */ + @IgniteMBeanDescription("Total received messages count.") + public int getTotalReceivedMessages(); + + /** + * Gets received messages counts (grouped by type). + * + * @return Map containing message types and respective counts. + */ + @IgniteMBeanDescription("Received messages by type.") + public Map<String, Integer> getReceivedMessages(); + + /** + * Gets total processed messages count. + * + * @return Total processed messages count. + */ + @IgniteMBeanDescription("Total processed messages count.") + public int getTotalProcessedMessages(); + + /** + * Gets processed messages counts (grouped by type). + * + * @return Map containing message types and respective counts. + */ + @IgniteMBeanDescription("Received messages by type.") + public Map<String, Integer> getProcessedMessages(); +}