http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ef8f69b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryStatistics.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryStatistics.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryStatistics.java new file mode 100644 index 0000000..874bd20 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryStatistics.java @@ -0,0 +1,639 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.apache.ignite.spi.discovery.tcp.internal; + +import org.apache.ignite.lang.*; +import org.apache.ignite.spi.discovery.tcp.messages.*; +import org.gridgain.grid.util.*; +import org.gridgain.grid.util.typedef.*; +import org.gridgain.grid.util.typedef.internal.*; +import org.gridgain.grid.util.tostring.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +/** + * Statistics for {@link org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi}. + */ +public class TcpDiscoveryStatistics { + /** Join started timestamp. */ + private long joinStartedTs; + + /** Join finished timestamp. */ + private long joinFinishedTs; + + /** Coordinator since timestamp. */ + private final AtomicLong crdSinceTs = new AtomicLong(); + + /** Joined nodes count. */ + private int joinedNodesCnt; + + /** Failed nodes count. */ + private int failedNodesCnt; + + /** Left nodes count. */ + private int leftNodesCnt; + + /** Ack timeouts count. */ + private int ackTimeoutsCnt; + + /** Socket timeouts count. */ + private int sockTimeoutsCnt; + + /** Received messages. */ + @GridToStringInclude + private final Map<String, Integer> rcvdMsgs = new HashMap<>(); + + /** Processed messages. */ + @GridToStringInclude + private final Map<String, Integer> procMsgs = new HashMap<>(); + + /** Average time taken to serialize messages. */ + @GridToStringInclude + private final Map<String, Long> avgMsgsSndTimes = new HashMap<>(); + + /** Average time taken to serialize messages. */ + @GridToStringInclude + private final Map<String, Long> maxMsgsSndTimes = new HashMap<>(); + + /** Sent messages. */ + @GridToStringInclude + private final Map<String, Integer> sentMsgs = new HashMap<>(); + + /** Messages receive timestamps. */ + private final Map<IgniteUuid, Long> msgsRcvTs = new GridBoundedLinkedHashMap<>(1024); + + /** Messages processing start timestamps. */ + private final Map<IgniteUuid, Long> msgsProcStartTs = new GridBoundedLinkedHashMap<>(1024); + + /** Ring messages sent timestamps. */ + private final Map<IgniteUuid, Long> ringMsgsSndTs = new GridBoundedLinkedHashMap<>(1024); + + /** Average time messages is in queue. */ + private long avgMsgQueueTime; + + /** Max time messages is in queue. */ + private long maxMsgQueueTime; + + /** Total number of ring messages sent. */ + private int ringMsgsSent; + + /** Average time it takes for messages to pass the full ring. */ + private long avgRingMsgTime; + + /** Max time it takes for messages to pass the full ring. */ + private long maxRingMsgTime; + + /** Class name of ring message that required the biggest time for full ring traverse. */ + private String maxRingTimeMsgCls; + + /** Average message processing time. */ + private long avgMsgProcTime; + + /** Max message processing time. */ + private long maxMsgProcTime; + + /** Class name of the message that required the biggest time to process. */ + private String maxProcTimeMsgCls; + + /** Socket readers created count. */ + private int sockReadersCreated; + + /** Socket readers removed count. */ + private int sockReadersRmv; + + /** Average time it takes to initialize connection from another node. */ + private long avgSrvSockInitTime; + + /** Max time it takes to initialize connection from another node. */ + private long maxSrvSockInitTime; + + /** Number of outgoing connections established. */ + private int clientSockCreatedCnt; + + /** Average time it takes to connect to another node. */ + private long avgClientSockInitTime; + + /** Max time it takes to connect to another node. */ + private long maxClientSockInitTime; + + /** Pending messages registered count. */ + private int pendingMsgsRegistered; + + /** Pending messages discarded count. */ + private int pendingMsgsDiscarded; + + /** + * Increments joined nodes count. + */ + public synchronized void onNodeJoined() { + joinedNodesCnt++; + } + + /** + * Increments left nodes count. + */ + public synchronized void onNodeLeft() { + leftNodesCnt++; + } + + /** + * Increments failed nodes count. + */ + public synchronized void onNodeFailed() { + failedNodesCnt++; + } + + /** + * Increments ack timeouts count. + */ + public synchronized void onAckTimeout() { + ackTimeoutsCnt++; + } + + /** + * Increments socket timeouts count. + */ + public synchronized void onSocketTimeout() { + sockTimeoutsCnt++; + } + + /** + * Initializes coordinator since date (if needed). + */ + public void onBecomingCoordinator() { + crdSinceTs.compareAndSet(0, U.currentTimeMillis()); + } + + /** + * Initializes join started timestamp. + */ + public synchronized void onJoinStarted() { + joinStartedTs = U.currentTimeMillis(); + } + + /** + * Initializes join finished timestamp. + */ + public synchronized void onJoinFinished() { + joinFinishedTs = U.currentTimeMillis(); + } + + /** + * @return Join started timestamp. + */ + public synchronized long joinStarted() { + return joinStartedTs; + } + + /** + * @return Join finished timestamp. + */ + public synchronized long joinFinished() { + return joinFinishedTs; + } + + /** + * Collects necessary stats for message received by SPI. + * + * @param msg Received message. + */ + public synchronized void onMessageReceived(TcpDiscoveryAbstractMessage msg) { + assert msg != null; + + Integer cnt = F.addIfAbsent(rcvdMsgs, msg.getClass().getSimpleName(), new Callable<Integer>() { + @Override public Integer call() { + return 0; + } + }); + + assert cnt != null; + + rcvdMsgs.put(msg.getClass().getSimpleName(), ++cnt); + + msgsRcvTs.put(msg.id(), U.currentTimeMillis()); + } + + /** + * Collects necessary stats for message processed by SPI. + * + * @param msg Processed message. + */ + public synchronized void onMessageProcessingStarted(TcpDiscoveryAbstractMessage msg) { + assert msg != null; + + Integer cnt = F.addIfAbsent(procMsgs, msg.getClass().getSimpleName(), new Callable<Integer>() { + @Override public Integer call() { + return 0; + } + }); + + assert cnt != null; + + procMsgs.put(msg.getClass().getSimpleName(), ++cnt); + + Long rcvdTs = msgsRcvTs.remove(msg.id()); + + if (rcvdTs != null) { + long duration = U.currentTimeMillis() - rcvdTs; + + if (maxMsgQueueTime < duration) + maxMsgQueueTime = duration; + + avgMsgQueueTime = (avgMsgQueueTime * (totalReceivedMessages() -1)) / totalProcessedMessages(); + } + + msgsProcStartTs.put(msg.id(), U.currentTimeMillis()); + } + + /** + * Collects necessary stats for message processed by SPI. + * + * @param msg Processed message. + */ + public synchronized void onMessageProcessingFinished(TcpDiscoveryAbstractMessage msg) { + assert msg != null; + + Long startTs = msgsProcStartTs.get(msg.id()); + + if (startTs != null) { + long duration = U.currentTimeMillis() - startTs; + + avgMsgProcTime = (avgMsgProcTime * (totalProcessedMessages() - 1) + duration) / totalProcessedMessages(); + + if (duration > maxMsgProcTime) { + maxMsgProcTime = duration; + + maxProcTimeMsgCls = msg.getClass().getSimpleName(); + } + + msgsProcStartTs.remove(msg.id()); + } + } + + /** + * Called by coordinator when ring message is sent. + * + * @param msg Sent message. + * @param time Time taken to serialize message. + */ + public synchronized void onMessageSent(TcpDiscoveryAbstractMessage msg, long time) { + assert msg != null; + assert time >= 0; + + if (crdSinceTs.get() > 0 && + (msg instanceof TcpDiscoveryNodeAddedMessage) || + (msg instanceof TcpDiscoveryNodeLeftMessage) || + (msg instanceof TcpDiscoveryNodeFailedMessage)) { + ringMsgsSndTs.put(msg.id(), U.currentTimeMillis()); + + ringMsgsSent++; + } + + Integer cnt = F.addIfAbsent(sentMsgs, msg.getClass().getSimpleName(), new Callable<Integer>() { + @Override public Integer call() { + return 0; + } + }); + + assert cnt != null; + + sentMsgs.put(msg.getClass().getSimpleName(), ++cnt); + + Long avgTime = F.addIfAbsent(avgMsgsSndTimes, msg.getClass().getSimpleName(), new Callable<Long>() { + @Override public Long call() { + return 0L; + } + }); + + assert avgTime != null; + + avgTime = (avgTime * (cnt - 1) + time) / cnt; + + avgMsgsSndTimes.put(msg.getClass().getSimpleName(), avgTime); + + Long maxTime = F.addIfAbsent(maxMsgsSndTimes, msg.getClass().getSimpleName(), new Callable<Long>() { + @Override public Long call() { + return 0L; + } + }); + + assert maxTime != null; + + if (time > maxTime) + maxMsgsSndTimes.put(msg.getClass().getSimpleName(), time); + } + + /** + * Called by coordinator when ring message makes full pass. + * + * @param msg Message. + */ + public synchronized void onRingMessageReceived(TcpDiscoveryAbstractMessage msg) { + assert msg != null; + + Long sentTs = ringMsgsSndTs.get(msg.id()); + + if (sentTs != null) { + long duration = U.currentTimeMillis() - sentTs; + + if (maxRingMsgTime < duration) { + maxRingMsgTime = duration; + + maxRingTimeMsgCls = msg.getClass().getSimpleName(); + } + + if (ringMsgsSent != 0) + avgRingMsgTime = (avgRingMsgTime * (ringMsgsSent - 1) + duration) / ringMsgsSent; + } + } + + /** + * Gets max time for ring message to make full pass. + * + * @return Max full pass time. + */ + public synchronized long maxRingMessageTime() { + return maxRingMsgTime; + } + + /** + * Gets class name of the message that took max time to make full pass. + * + * @return Message class name. + */ + public synchronized String maxRingDurationMessageClass() { + return maxRingTimeMsgCls; + } + + /** + * Gets class name of the message took max time to process. + * + * @return Message class name. + */ + public synchronized String maxProcessingTimeMessageClass() { + return maxProcTimeMsgCls; + } + + /** + * @param initTime Time socket was initialized in. + */ + public synchronized void onServerSocketInitialized(long initTime) { + assert initTime >= 0; + + if (maxSrvSockInitTime < initTime) + maxSrvSockInitTime = initTime; + + avgSrvSockInitTime = (avgSrvSockInitTime * (sockReadersCreated - 1) + initTime) / sockReadersCreated; + } + + /** + * @param initTime Time socket was initialized in. + */ + public synchronized void onClientSocketInitialized(long initTime) { + assert initTime >= 0; + + clientSockCreatedCnt++; + + if (maxClientSockInitTime < initTime) + maxClientSockInitTime = initTime; + + avgClientSockInitTime = (avgClientSockInitTime * (clientSockCreatedCnt - 1) + initTime) / clientSockCreatedCnt; + } + + /** + * Increments pending messages registered count. + */ + public synchronized void onPendingMessageRegistered() { + pendingMsgsRegistered++; + } + + /** + * Increments pending messages discarded count. + */ + public synchronized void onPendingMessageDiscarded() { + pendingMsgsDiscarded++; + } + + /** + * Increments socket readers created count. + */ + public synchronized void onSocketReaderCreated() { + sockReadersCreated++; + } + + /** + * Increments socket readers removed count. + */ + public synchronized void onSocketReaderRemoved() { + sockReadersRmv++; + } + + /** + * Gets processed messages counts (grouped by type). + * + * @return Map containing message types and respective counts. + */ + public synchronized Map<String, Integer> processedMessages() { + return new HashMap<>(procMsgs); + } + + /** + * Gets received messages counts (grouped by type). + * + * @return Map containing message types and respective counts. + */ + public synchronized Map<String, Integer> receivedMessages() { + return new HashMap<>(rcvdMsgs); + } + + /** + * Gets max messages send time (grouped by type). + * + * @return Map containing messages types and max send times. + */ + public synchronized Map<String, Long> maxMessagesSendTimes() { + return new HashMap<>(maxMsgsSndTimes); + } + + /** + * Gets average messages send time (grouped by type). + * + * @return Map containing messages types and average send times. + */ + public synchronized Map<String, Long> avgMessagesSendTimes() { + return new HashMap<>(avgMsgsSndTimes); + } + + /** + * Gets total received messages count. + * + * @return Total received messages count. + */ + public synchronized int totalReceivedMessages() { + return F.sumInt(receivedMessages().values()); + } + + /** + * Gets total processed messages count. + * + * @return Total processed messages count. + */ + public synchronized int totalProcessedMessages() { + return F.sumInt(processedMessages().values()); + } + + /** + * Gets max message processing time. + * + * @return Max message processing time. + */ + public synchronized long maxMessageProcessingTime(){ + return maxMsgProcTime; + } + + /** + * Gets average message processing time. + * + * @return Average message processing time. + */ + public synchronized long avgMessageProcessingTime() { + return avgMsgProcTime; + } + + /** + * Gets pending messages registered count. + * + * @return Pending messages registered count. + */ + public synchronized long pendingMessagesRegistered() { + return pendingMsgsRegistered; + } + + /** + * Gets pending messages discarded count. + * + * @return Pending messages registered count. + */ + public synchronized long pendingMessagesDiscarded() { + return pendingMsgsDiscarded; + } + + /** + * Gets nodes joined count. + * + * @return Nodes joined count. + */ + public synchronized int joinedNodesCount() { + return joinedNodesCnt; + } + + /** + * Gets nodes left count. + * + * @return Nodes left count. + */ + public synchronized int leftNodesCount() { + return leftNodesCnt; + } + + /** + * Gets failed nodes count. + * + * @return Failed nodes count. + */ + public synchronized int failedNodesCount() { + return failedNodesCnt; + } + + /** + * @return Ack timeouts count. + */ + public synchronized int ackTimeoutsCount() { + return ackTimeoutsCnt; + } + + /** + * @return Socket timeouts count. + */ + public synchronized int socketTimeoutsCount() { + return sockTimeoutsCnt; + } + + /** + * Gets socket readers created count. + * + * @return Socket readers created count. + */ + public synchronized int socketReadersCreated() { + return sockReadersCreated; + } + + /** + * Gets socket readers removed count. + * + * @return Socket readers removed count. + */ + public synchronized int socketReadersRemoved() { + return sockReadersRmv; + } + + /** + * Gets time local node has been coordinator since. + * + * @return Coordinator since timestamp. + */ + public long coordinatorSinceTimestamp() { + return crdSinceTs.get(); + } + + /** + * Clears statistics. + */ + public synchronized void clear() { + ackTimeoutsCnt = 0; + avgClientSockInitTime = 0; + avgMsgProcTime = 0; + avgMsgQueueTime = 0; + avgMsgsSndTimes.clear(); + avgRingMsgTime = 0; + avgSrvSockInitTime = 0; + clientSockCreatedCnt = 0; + crdSinceTs.set(0); + failedNodesCnt = 0; + joinedNodesCnt = 0; + joinFinishedTs = 0; + joinStartedTs = 0; + leftNodesCnt = 0; + maxClientSockInitTime = 0; + maxMsgProcTime = 0; + maxMsgQueueTime = 0; + maxMsgsSndTimes.clear(); + maxProcTimeMsgCls = null; + maxRingMsgTime = 0; + maxRingTimeMsgCls = null; + maxSrvSockInitTime = 0; + pendingMsgsDiscarded = 0; + pendingMsgsRegistered = 0; + procMsgs.clear(); + rcvdMsgs.clear(); + ringMsgsSent = 0; + sentMsgs.clear(); + sockReadersCreated = 0; + sockReadersRmv = 0; + sockTimeoutsCnt = 0; + } + + /** {@inheritDoc} */ + @Override public synchronized String toString() { + return S.toString(TcpDiscoveryStatistics.class, this); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ef8f69b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/package.html ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/package.html b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/package.html new file mode 100644 index 0000000..b44e05c --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/package.html @@ -0,0 +1,15 @@ +<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> +<!-- + @html.file.header + _________ _____ __________________ _____ + __ ____/___________(_)______ /__ ____/______ ____(_)_______ + _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ +--> +<html> +<body> + <!-- Package description. --> + Contains internal implementation. +</body> +</html> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ef8f69b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinder.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinder.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinder.java new file mode 100644 index 0000000..27417f8 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinder.java @@ -0,0 +1,90 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.apache.ignite.spi.discovery.tcp.ipfinder; + +import org.apache.ignite.spi.*; + +import java.net.*; +import java.util.*; + +/** + * IP finder interface for {@link org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi}. + */ +public interface TcpDiscoveryIpFinder { + /** + * Callback invoked when SPI context is initialized after {@link org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi#spiStart(String)} + * method is completed, SPI context can be stored for future access. + * + * @param spiCtx Spi context. + * @throws org.apache.ignite.spi.IgniteSpiException In case of error. + */ + public void onSpiContextInitialized(IgniteSpiContext spiCtx) throws IgniteSpiException; + + /** + * Callback invoked prior to stopping grid before SPI context is destroyed. + * Note that invoking SPI context after this callback is complete is considered + * illegal and may produce unknown results. + */ + public void onSpiContextDestroyed(); + + /** + * Initializes addresses discovery SPI binds to. + * + * @param addrs Addresses discovery SPI binds to. + * @throws org.apache.ignite.spi.IgniteSpiException In case of error. + */ + public void initializeLocalAddresses(Collection<InetSocketAddress> addrs) throws IgniteSpiException; + + /** + * Gets all addresses registered in this finder. + * + * @return All known addresses, potentially empty, but never {@code null}. + * @throws org.apache.ignite.spi.IgniteSpiException In case of error. + */ + public Collection<InetSocketAddress> getRegisteredAddresses() throws IgniteSpiException; + + /** + * Checks whether IP finder is shared or not. + * <p> + * If it is shared then only coordinator can unregister addresses. + * <p> + * All nodes should register their address themselves, as early as possible on node start. + * + * @return {@code true} if IP finder is shared. + */ + public boolean isShared(); + + /** + * Registers new addresses. + * <p> + * Implementation should accept duplicates quietly, but should not register address if it + * is already registered. + * + * @param addrs Addresses to register. Not {@code null} and not empty. + * @throws org.apache.ignite.spi.IgniteSpiException In case of error. + */ + public void registerAddresses(Collection<InetSocketAddress> addrs) throws IgniteSpiException; + + /** + * Unregisters provided addresses. + * <p> + * Implementation should accept addresses that are currently not + * registered quietly (just no-op). + * + * @param addrs Addresses to unregister. Not {@code null} and not empty. + * @throws org.apache.ignite.spi.IgniteSpiException In case of error. + */ + public void unregisterAddresses(Collection<InetSocketAddress> addrs) throws IgniteSpiException; + + /** + * Closes this IP finder and releases any system resources associated with it. + */ + public void close(); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ef8f69b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinderAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinderAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinderAdapter.java new file mode 100644 index 0000000..502c1b6 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinderAdapter.java @@ -0,0 +1,77 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.apache.ignite.spi.discovery.tcp.ipfinder; + +import org.apache.ignite.spi.*; +import org.gridgain.grid.util.tostring.*; +import org.gridgain.grid.util.typedef.internal.*; + +import java.net.*; +import java.util.*; + +/** + * IP finder interface implementation adapter. + */ +public abstract class TcpDiscoveryIpFinderAdapter implements TcpDiscoveryIpFinder { + /** Shared flag. */ + private boolean shared; + + /** SPI context. */ + @GridToStringExclude + private volatile IgniteSpiContext spiCtx; + + /** {@inheritDoc} */ + @Override public void onSpiContextInitialized(IgniteSpiContext spiCtx) throws IgniteSpiException { + this.spiCtx = spiCtx; + } + + /** {@inheritDoc} */ + @Override public void onSpiContextDestroyed() { + spiCtx = null; + } + + /** {@inheritDoc} */ + @Override public void initializeLocalAddresses(Collection<InetSocketAddress> addrs) throws IgniteSpiException { + registerAddresses(addrs); + } + + /** {@inheritDoc} */ + @Override public boolean isShared() { + return shared; + } + + /** + * Sets shared flag. If {@code true} then it is expected that IP addresses registered + * with IP finder will be seen by IP finders on all other nodes. + * + * @param shared {@code true} if this IP finder is shared. + */ + @IgniteSpiConfiguration(optional = true) + public void setShared(boolean shared) { + this.shared = shared; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TcpDiscoveryIpFinderAdapter.class, this); + } + + /** {@inheritDoc} */ + @Override public void close() { + // No-op. + } + + /** + * @return SPI context. + */ + protected IgniteSpiContext spiContext() { + return spiCtx; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ef8f69b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/jdbc/TcpDiscoveryJdbcIpFinder.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/jdbc/TcpDiscoveryJdbcIpFinder.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/jdbc/TcpDiscoveryJdbcIpFinder.java new file mode 100644 index 0000000..7514d2a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/jdbc/TcpDiscoveryJdbcIpFinder.java @@ -0,0 +1,361 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.apache.ignite.spi.discovery.tcp.ipfinder.jdbc; + +import org.apache.ignite.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.spi.*; +import org.gridgain.grid.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.gridgain.grid.util.typedef.*; +import org.gridgain.grid.util.typedef.internal.*; +import org.gridgain.grid.util.tostring.*; + +import javax.sql.*; +import java.net.*; +import java.sql.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static java.sql.Connection.*; + +/** + * JDBC-based IP finder. + * <h1 class="header">Configuration</h1> + * <h2 class="header">Mandatory</h2> + * <ul> + * <li>Data source (see {@link #setDataSource(DataSource)}).</li> + * </ul> + * <h2 class="header">Optional</h2> + * The following configuration parameters are optional: + * <ul> + * <li>Flag indicating whether DB schema should be initialized by GridGain (default behaviour) or + * was explicitly created by user (see {@link #setInitSchema(boolean)})</li> + * </ul> + * <p> + * The database will contain 1 table which will hold IP addresses. + */ +public class TcpDiscoveryJdbcIpFinder extends TcpDiscoveryIpFinderAdapter { + /** Query to get addresses. */ + public static final String GET_ADDRS_QRY = "select hostname, port from tbl_addrs"; + + /** Query to register address. */ + public static final String REG_ADDR_QRY = "insert into tbl_addrs values (?, ?)"; + + /** Query to unregister address. */ + public static final String UNREG_ADDR_QRY = "delete from tbl_addrs where hostname = ? and port = ?"; + + /** Query to create addresses table. */ + public static final String CREATE_ADDRS_TABLE_QRY = + "create table if not exists tbl_addrs (" + + "hostname VARCHAR(1024), " + + "port INT)"; + + /** Query to check database validity. */ + public static final String CHK_QRY = "select count(*) from tbl_addrs"; + + /** Grid logger. */ + @IgniteLoggerResource + private IgniteLogger log; + + /** Data source. */ + private DataSource dataSrc; + + /** Flag for schema initialization. */ + private boolean initSchema = true; + + /** Init guard. */ + @GridToStringExclude + private final AtomicBoolean initGuard = new AtomicBoolean(); + + /** Init latch. */ + @GridToStringExclude + private final CountDownLatch initLatch = new CountDownLatch(1); + + /** + * Constructor. + */ + public TcpDiscoveryJdbcIpFinder() { + setShared(true); + } + + /** {@inheritDoc} */ + @Override public Collection<InetSocketAddress> getRegisteredAddresses() throws IgniteSpiException { + init(); + + Connection conn = null; + + PreparedStatement stmt = null; + + ResultSet rs = null; + + try { + conn = dataSrc.getConnection(); + + conn.setTransactionIsolation(TRANSACTION_READ_COMMITTED); + + stmt = conn.prepareStatement(GET_ADDRS_QRY); + + rs = stmt.executeQuery(); + + Collection<InetSocketAddress> addrs = new LinkedList<>(); + + while (rs.next()) + addrs.add(new InetSocketAddress(rs.getString(1), rs.getInt(2))); + + return addrs; + } + catch (SQLException e) { + throw new IgniteSpiException("Failed to get registered addresses version.", e); + } + finally { + U.closeQuiet(rs); + U.closeQuiet(stmt); + U.closeQuiet(conn); + } + } + + /** {@inheritDoc} */ + @Override public void registerAddresses(Collection<InetSocketAddress> addrs) throws IgniteSpiException { + assert !F.isEmpty(addrs); + + init(); + + Connection conn = null; + + PreparedStatement stmtUnreg = null; + + PreparedStatement stmtReg = null; + + boolean committed = false; + + try { + conn = dataSrc.getConnection(); + + conn.setAutoCommit(false); + + conn.setTransactionIsolation(TRANSACTION_READ_COMMITTED); + + stmtUnreg = conn.prepareStatement(UNREG_ADDR_QRY); + stmtReg = conn.prepareStatement(REG_ADDR_QRY); + + for (InetSocketAddress addr : addrs) { + stmtUnreg.setString(1, addr.getAddress().getHostAddress()); + stmtUnreg.setInt(2, addr.getPort()); + + stmtUnreg.addBatch(); + + stmtReg.setString(1, addr.getAddress().getHostAddress()); + stmtReg.setInt(2, addr.getPort()); + + stmtReg.addBatch(); + } + + stmtUnreg.executeBatch(); + stmtUnreg.close(); + + stmtReg.executeBatch(); + stmtReg.close(); + + conn.commit(); + + committed = true; + } + catch (SQLException e) { + U.rollbackConnectionQuiet(conn); + + throw new IgniteSpiException("Failed to register addresses: " + addrs, e); + } + finally { + if (!committed) + U.rollbackConnectionQuiet(conn); + + U.closeQuiet(stmtUnreg); + U.closeQuiet(stmtReg); + U.closeQuiet(conn); + } + } + + /** {@inheritDoc} */ + @Override public void unregisterAddresses(Collection<InetSocketAddress> addrs) throws IgniteSpiException { + assert !F.isEmpty(addrs); + + init(); + + Connection conn = null; + + PreparedStatement stmt = null; + + boolean committed = false; + + try { + conn = dataSrc.getConnection(); + + conn.setAutoCommit(false); + + conn.setTransactionIsolation(TRANSACTION_READ_COMMITTED); + + stmt = conn.prepareStatement(UNREG_ADDR_QRY); + + for (InetSocketAddress addr : addrs) { + stmt.setString(1, addr.getAddress().getHostAddress()); + stmt.setInt(2, addr.getPort()); + + stmt.addBatch(); + } + + stmt.executeBatch(); + conn.commit(); + + committed = true; + } + catch (SQLException e) { + U.rollbackConnectionQuiet(conn); + + throw new IgniteSpiException("Failed to unregister addresses: " + addrs, e); + } + finally { + if (!committed) + U.rollbackConnectionQuiet(conn); + + U.closeQuiet(stmt); + U.closeQuiet(conn); + } + } + + /** + * Sets data source. + * <p> + * Data source should be fully configured and ready-to-use. + * + * @param dataSrc Data source. + */ + @IgniteSpiConfiguration(optional = false) + public void setDataSource(DataSource dataSrc) { + this.dataSrc = dataSrc; + } + + /** + * Flag indicating whether DB schema should be initialized by GridGain (default behaviour) or + * was explicitly created by user. + * + * @param initSchema {@code True} if DB schema should be initialized by GridGain (default behaviour), + * {code @false} if schema was explicitly created by user. + */ + @IgniteSpiConfiguration(optional = true) + public void setInitSchema(boolean initSchema) { + this.initSchema = initSchema; + } + + /** + * Checks configuration validity. + * + * @throws org.apache.ignite.spi.IgniteSpiException If any error occurs. + */ + private void init() throws IgniteSpiException { + if (initGuard.compareAndSet(false, true)) { + if (dataSrc == null) + throw new IgniteSpiException("Data source is null (you must configure it via setDataSource(..)" + + " configuration property)"); + + if (!initSchema) { + initLatch.countDown(); + + checkSchema(); + + return; + } + + Connection conn = null; + + Statement stmt = null; + + boolean committed = false; + + try { + conn = dataSrc.getConnection(); + + conn.setAutoCommit(false); + + conn.setTransactionIsolation(TRANSACTION_READ_COMMITTED); + + // Create tbl_addrs. + stmt = conn.createStatement(); + + stmt.executeUpdate(CREATE_ADDRS_TABLE_QRY); + + conn.commit(); + + committed = true; + + if (log.isDebugEnabled()) + log.debug("DB schema has been initialized."); + } + catch (SQLException e) { + U.rollbackConnectionQuiet(conn); + + throw new IgniteSpiException("Failed to initialize DB schema.", e); + } + finally { + if (!committed) + U.rollbackConnectionQuiet(conn); + + U.closeQuiet(stmt); + U.closeQuiet(conn); + + initLatch.countDown(); + } + } + else + checkSchema(); + } + + /** + * Checks correctness of existing DB schema. + * + * @throws org.apache.ignite.spi.IgniteSpiException If schema wasn't properly initialized. + */ + private void checkSchema() throws IgniteSpiException { + try { + U.await(initLatch); + } + catch (GridInterruptedException e) { + throw new IgniteSpiException("Thread has been interrupted.", e); + } + + Connection conn = null; + + Statement stmt = null; + + try { + conn = dataSrc.getConnection(); + + conn.setTransactionIsolation(TRANSACTION_READ_COMMITTED); + + // Check if tbl_addrs exists and database initialized properly. + stmt = conn.createStatement(); + + stmt.execute(CHK_QRY); + } + catch (SQLException e) { + throw new IgniteSpiException("IP finder has not been properly initialized.", e); + } + finally { + U.closeQuiet(stmt); + U.closeQuiet(conn); + } + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TcpDiscoveryJdbcIpFinder.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ef8f69b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/jdbc/package.html ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/jdbc/package.html b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/jdbc/package.html new file mode 100644 index 0000000..dfe23f6 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/jdbc/package.html @@ -0,0 +1,15 @@ +<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> +<!-- + @html.file.header + _________ _____ __________________ _____ + __ ____/___________(_)______ /__ ____/______ ____(_)_______ + _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ +--> +<html> +<body> + <!-- Package description. --> + Contains JDBC IP finder implementation. +</body> +</html> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ef8f69b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java new file mode 100644 index 0000000..2e181b5 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java @@ -0,0 +1,752 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.apache.ignite.spi.discovery.tcp.ipfinder.multicast; + +import org.apache.ignite.*; +import org.apache.ignite.marshaller.*; +import org.apache.ignite.marshaller.jdk.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.spi.*; +import org.gridgain.grid.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.gridgain.grid.util.typedef.*; +import org.gridgain.grid.util.typedef.internal.*; +import org.gridgain.grid.util.tostring.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.net.*; +import java.util.*; + +import static org.apache.ignite.IgniteSystemProperties.*; +import static org.apache.ignite.spi.IgnitePortProtocol.*; + +/** + * Multicast-based IP finder. + * <p> + * When TCP discovery starts this finder sends multicast request and waits + * for some time when others nodes reply to this request with messages containing + * their addresses (time IP finder waits for response and number of attempts to + * re-send multicast request in case if no replies are received can be configured, + * see {@link #setResponseWaitTime(int)} and {@link #setAddressRequestAttempts(int)}). + * <p> + * In addition to address received via multicast this finder can work with pre-configured + * list of addresses specified via {@link #setAddresses(Collection)} method. + * <h1 class="header">Configuration</h1> + * <h2 class="header">Mandatory</h2> + * There are no mandatory configuration parameters. + * <h2 class="header">Optional</h2> + * <ul> + * <li>Multicast IP address (see {@link #setMulticastGroup(String)}).</li> + * <li>Multicast port number (see {@link #setMulticastPort(int)}).</li> + * <li>Address response wait time (see {@link #setResponseWaitTime(int)}).</li> + * <li>Address request attempts (see {@link #setAddressRequestAttempts(int)}).</li> + * <li>Pre-configured addresses (see {@link #setAddresses(Collection)})</li> + * <li>Local address (see {@link #setLocalAddress(String)})</li> + * </ul> + */ +public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder { + /** Default multicast IP address (value is {@code 228.1.2.4}). */ + public static final String DFLT_MCAST_GROUP = "228.1.2.4"; + + /** Default multicast port number (value is {@code 47400}). */ + public static final int DFLT_MCAST_PORT = 47400; + + /** Default time IP finder waits for reply to multicast address request (value is {@code 500}). */ + public static final int DFLT_RES_WAIT_TIME = 500; + + /** Default number of attempts to send multicast address request (value is {@code 2}). */ + public static final int DFLT_ADDR_REQ_ATTEMPTS = 2; + + /** Address request message data. */ + private static final byte[] MSG_ADDR_REQ_DATA = U.GG_HEADER; + + /** */ + private static final IgniteMarshaller marsh = new IgniteJdkMarshaller(); + + /** Grid logger. */ + @IgniteLoggerResource + private IgniteLogger log; + + /** Grid name. */ + @IgniteNameResource + @GridToStringExclude + private String gridName; + + /** Multicast IP address as string. */ + private String mcastGrp = DFLT_MCAST_GROUP; + + /** Multicast port number. */ + private int mcastPort = DFLT_MCAST_PORT; + + /** Time IP finder waits for reply to multicast address request. */ + private int resWaitTime = DFLT_RES_WAIT_TIME; + + /** Number of attempts to send multicast address request. */ + private int addrReqAttempts = DFLT_ADDR_REQ_ATTEMPTS; + + /** Local address */ + private String locAddr; + + /** */ + @GridToStringExclude + private Collection<AddressSender> addrSnds; + + /** + * Constructs new IP finder. + */ + public TcpDiscoveryMulticastIpFinder() { + setShared(true); + } + + /** + * Sets IP address of multicast group. + * <p> + * If not provided, default value is {@link #DFLT_MCAST_GROUP}. + * + * @param mcastGrp Multicast IP address. + */ + @IgniteSpiConfiguration(optional = true) + public void setMulticastGroup(String mcastGrp) { + this.mcastGrp = mcastGrp; + } + + /** + * Gets IP address of multicast group. + * + * @return Multicast IP address. + */ + public String getMulticastGroup() { + return mcastGrp; + } + + /** + * Sets port number which multicast messages are sent to. + * <p> + * If not provided, default value is {@link #DFLT_MCAST_PORT}. + * + * @param mcastPort Multicast port number. + */ + @IgniteSpiConfiguration(optional = true) + public void setMulticastPort(int mcastPort) { + this.mcastPort = mcastPort; + } + + /** + * Gets port number which multicast messages are sent to. + * + * @return Port number. + */ + public int getMulticastPort() { + return mcastPort; + } + + /** + * Sets time in milliseconds IP finder waits for reply to + * multicast address request. + * <p> + * If not provided, default value is {@link #DFLT_RES_WAIT_TIME}. + * + * @param resWaitTime Time IP finder waits for reply to multicast address request. + */ + @IgniteSpiConfiguration(optional = true) + public void setResponseWaitTime(int resWaitTime) { + this.resWaitTime = resWaitTime; + } + + /** + * Gets time in milliseconds IP finder waits for reply to + * multicast address request. + * + * @return Time IP finder waits for reply to multicast address request. + */ + public int getResponseWaitTime() { + return resWaitTime; + } + + /** + * Sets number of attempts to send multicast address request. IP finder re-sends + * request only in case if no reply for previous request is received. + * <p> + * If not provided, default value is {@link #DFLT_ADDR_REQ_ATTEMPTS}. + * + * @param addrReqAttempts Number of attempts to send multicast address request. + */ + @IgniteSpiConfiguration(optional = true) + public void setAddressRequestAttempts(int addrReqAttempts) { + this.addrReqAttempts = addrReqAttempts; + } + + /** + * Gets number of attempts to send multicast address request. IP finder re-sends + * request only in case if no reply for previous request is received. + * + * @return Number of attempts to send multicast address request. + */ + public int getAddressRequestAttempts() { + return addrReqAttempts; + } + + /** + * Sets local host address used by this IP finder. If provided address is non-loopback then multicast + * socket is bound to this interface. If local address is not set or is any local address then IP finder + * creates multicast sockets for all found non-loopback addresses. + * <p> + * If not provided then this property is initialized by the local address set in {@link org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi} + * configuration. + * + * @param locAddr Local host address. + * @see org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi#setLocalAddress(String) + */ + @IgniteSpiConfiguration(optional = true) + public void setLocalAddress(String locAddr) { + this.locAddr = locAddr; + } + + /** + * Gets local address that multicast IP finder uses. + * + * @return Local address. + */ + public String getLocalAddress() { + return locAddr; + } + + /** {@inheritDoc} */ + @Override public void initializeLocalAddresses(Collection<InetSocketAddress> addrs) throws IgniteSpiException { + // If GRIDGAIN_OVERRIDE_MCAST_GRP system property is set, use its value to override multicast group from + // configuration. Used for testing purposes. + String overrideMcastGrp = System.getProperty(GG_OVERRIDE_MCAST_GRP); + + if (overrideMcastGrp != null) + mcastGrp = overrideMcastGrp; + + if (F.isEmpty(mcastGrp)) + throw new IgniteSpiException("Multicast IP address is not specified."); + + if (mcastPort < 0 || mcastPort > 65535) + throw new IgniteSpiException("Invalid multicast port: " + mcastPort); + + if (resWaitTime <= 0) + throw new IgniteSpiException("Invalid wait time, value greater than zero is expected: " + resWaitTime); + + if (addrReqAttempts <= 0) + throw new IgniteSpiException("Invalid number of address request attempts, " + + "value greater than zero is expected: " + addrReqAttempts); + + if (F.isEmpty(getRegisteredAddresses())) + U.warn(log, "GridTcpDiscoveryMulticastIpFinder has no pre-configured addresses " + + "(it is recommended in production to specify at least one address in " + + "GridTcpDiscoveryMulticastIpFinder.getAddresses() configuration property)"); + + InetAddress mcastAddr; + + try { + mcastAddr = InetAddress.getByName(mcastGrp); + } + catch (UnknownHostException e) { + throw new IgniteSpiException("Unknown multicast group: " + mcastGrp, e); + } + + if (!mcastAddr.isMulticastAddress()) + throw new IgniteSpiException("Invalid multicast group address: " + mcastAddr); + + Collection<String> locAddrs; + + try { + locAddrs = U.resolveLocalAddresses(U.resolveLocalHost(locAddr)).get1(); + } + catch (IOException | GridException e) { + throw new IgniteSpiException("Failed to resolve local addresses [locAddr=" + locAddr + ']', e); + } + + assert locAddrs != null; + + addrSnds = new ArrayList<>(locAddrs.size()); + + Collection<InetAddress> reqItfs = new ArrayList<>(locAddrs.size()); // Interfaces used to send requests. + + for (String locAddr : locAddrs) { + InetAddress addr; + + try { + addr = InetAddress.getByName(locAddr); + } + catch (UnknownHostException e) { + if (log.isDebugEnabled()) + log.debug("Failed to resolve local address [locAddr=" + locAddr + ", err=" + e + ']'); + + continue; + } + + if (!addr.isLoopbackAddress()) { + try { + addrSnds.add(new AddressSender(mcastAddr, addr, addrs)); + + reqItfs.add(addr); + } + catch (IOException e) { + if (log.isDebugEnabled()) + log.debug("Failed to create multicast socket [mcastAddr=" + mcastAddr + + ", mcastGrp=" + mcastGrp + ", mcastPort=" + mcastPort + ", locAddr=" + addr + + ", err=" + e + ']'); + } + } + } + + if (addrSnds.isEmpty()) { + try { + // Create non-bound socket if local host is loopback or failed to create sockets explicitly + // bound to interfaces. + addrSnds.add(new AddressSender(mcastAddr, null, addrs)); + } + catch (IOException e) { + throw new IgniteSpiException("Failed to create multicast socket [mcastAddr=" + mcastAddr + + ", mcastGrp=" + mcastGrp + ", mcastPort=" + mcastPort + ']', e); + } + } + + for (AddressSender addrSnd :addrSnds) + addrSnd.start(); + + Collection<InetSocketAddress> ret; + + if (reqItfs.size() > 1) { + ret = new HashSet<>(); + + Collection<AddressReceiver> rcvrs = new ArrayList<>(); + + for (InetAddress itf : reqItfs) { + AddressReceiver rcvr = new AddressReceiver(mcastAddr, itf); + + rcvr.start(); + + rcvrs.add(rcvr); + } + + for (AddressReceiver rcvr : rcvrs) { + try { + rcvr.join(); + + ret.addAll(rcvr.addresses()); + } + catch (InterruptedException ignore) { + U.warn(log, "Got interrupted while receiving address request."); + + Thread.currentThread().interrupt(); + + break; + } + } + } + else + ret = requestAddresses(mcastAddr, F.first(reqItfs)); + + if (!ret.isEmpty()) + registerAddresses(ret); + } + + /** {@inheritDoc} */ + @Override public void onSpiContextInitialized(IgniteSpiContext spiCtx) throws IgniteSpiException { + super.onSpiContextInitialized(spiCtx); + + spiCtx.registerPort(mcastPort, UDP); + } + + /** + * Sends multicast address request message and waits for reply. Response wait time and number + * of request attempts are configured as properties {@link #setResponseWaitTime} and + * {@link #setAddressRequestAttempts}. + * + * @param mcastAddr Multicast address where to send request. + * @param sockItf Optional interface multicast socket should be bound to. + * @return Collection of received addresses. + */ + private Collection<InetSocketAddress> requestAddresses(InetAddress mcastAddr, @Nullable InetAddress sockItf) { + Collection<InetSocketAddress> rmtAddrs = new HashSet<>(); + + try { + DatagramPacket reqPckt = new DatagramPacket(MSG_ADDR_REQ_DATA, MSG_ADDR_REQ_DATA.length, + mcastAddr, mcastPort); + + byte[] resData = new byte[AddressResponse.MAX_DATA_LENGTH]; + + DatagramPacket resPckt = new DatagramPacket(resData, resData.length); + + boolean sndError = false; + + for (int i = 0; i < addrReqAttempts; i++) { + MulticastSocket sock = null; + + try { + sock = new MulticastSocket(0); + + // Use 'false' to enable support for more than one node on the same machine. + sock.setLoopbackMode(false); + + if (sockItf != null) + sock.setInterface(sockItf); + + sock.setSoTimeout(resWaitTime); + + reqPckt.setData(MSG_ADDR_REQ_DATA); + + try { + sock.send(reqPckt); + } + catch (IOException e) { + if (!handleNetworkError(e)) + break; + + if (i < addrReqAttempts - 1) { + if (log.isDebugEnabled()) + log.debug("Failed to send multicast address request (will retry in 500 ms): " + e); + + U.sleep(500); + } + else { + if (log.isDebugEnabled()) + log.debug("Failed to send multicast address request: " + e); + } + + sndError = true; + + continue; + } + + long rcvEnd = U.currentTimeMillis() + resWaitTime; + + try { + while (U.currentTimeMillis() < rcvEnd) { // Try to receive multiple responses. + sock.receive(resPckt); + + byte[] data = resPckt.getData(); + + if (!U.bytesEqual(U.GG_HEADER, 0, data, 0, U.GG_HEADER.length)) { + U.error(log, "Failed to verify message header."); + + continue; + } + + AddressResponse addrRes; + + try { + addrRes = new AddressResponse(data); + } + catch (GridException e) { + LT.warn(log, e, "Failed to deserialize multicast response."); + + continue; + } + + rmtAddrs.addAll(addrRes.addresses()); + } + } + catch (SocketTimeoutException ignored) { + if (log.isDebugEnabled()) // DatagramSocket.receive timeout has expired. + log.debug("Address receive timeout."); + } + } + catch (IOException e) { + U.error(log, "Failed to request nodes addresses.", e); + } + finally { + U.close(sock); + } + + if (!rmtAddrs.isEmpty()) + break; + + if (i < addrReqAttempts - 1) // Wait some time before re-sending address request. + U.sleep(200); + } + + if (log.isDebugEnabled()) + log.debug("Received nodes addresses: " + rmtAddrs); + + if (rmtAddrs.isEmpty() && sndError) + U.quietAndWarn(log, "Failed to send multicast message (is multicast enabled on this node?)."); + + return rmtAddrs; + } + catch (GridInterruptedException ignored) { + U.warn(log, "Got interrupted while sending address request."); + + Thread.currentThread().interrupt(); + + return rmtAddrs; + } + } + + /** {@inheritDoc} */ + @Override public void close() { + for (AddressSender addrSnd : addrSnds) + U.interrupt(addrSnd); + + for (AddressSender addrSnd : addrSnds) + U.join(addrSnd, log); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TcpDiscoveryMulticastIpFinder.class, this, "super", super.toString()); + } + + /** + * @param e Network error to handle. + * @return {@code True} if this error is recoverable and the operation can be retried. + */ + private boolean handleNetworkError(IOException e) { + if ("Network is unreachable".equals(e.getMessage()) && U.isMacOs()) { + U.warn(log, "Multicast does not work on Mac OS JVM loopback address (configure external IP address " + + "for 'localHost' configuration property)"); + + return false; + } + + return true; + } + + /** + * Response to multicast address request. + */ + private static class AddressResponse { + /** Maximum supported multicast message. */ + public static final int MAX_DATA_LENGTH = 64 * 1024; + + /** */ + private byte[] data; + + /** */ + private Collection<InetSocketAddress> addrs; + + /** + * @param addrs Addresses discovery SPI binds to. + * @throws GridException If marshalling failed. + */ + private AddressResponse(Collection<InetSocketAddress> addrs) throws GridException { + this.addrs = addrs; + + byte[] addrsData = marsh.marshal(addrs); + data = new byte[U.GG_HEADER.length + addrsData.length]; + + if (data.length > MAX_DATA_LENGTH) + throw new GridException("Too long data packet [size=" + data.length + ", max=" + MAX_DATA_LENGTH + "]"); + + System.arraycopy(U.GG_HEADER, 0, data, 0, U.GG_HEADER.length); + System.arraycopy(addrsData, 0, data, 4, addrsData.length); + } + + /** + * @param data Message data. + * @throws GridException If unmarshalling failed. + */ + private AddressResponse(byte[] data) throws GridException { + assert U.bytesEqual(U.GG_HEADER, 0, data, 0, U.GG_HEADER.length); + + this.data = data; + + addrs = marsh.unmarshal(Arrays.copyOfRange(data, U.GG_HEADER.length, data.length), null); + } + + /** + * @return Message data. + */ + byte[] data() { + return data; + } + + /** + * @return IP address discovery SPI binds to. + */ + public Collection<InetSocketAddress> addresses() { + return addrs; + } + } + + /** + * Thread sends multicast address request message and waits for reply. + */ + private class AddressReceiver extends IgniteSpiThread { + /** */ + private final InetAddress mcastAddr; + + /** */ + private final InetAddress sockAddr; + + /** */ + private Collection<InetSocketAddress> addrs; + + /** + * @param mcastAddr Multicast address where to send request. + * @param sockAddr Optional address multicast socket should be bound to. + */ + private AddressReceiver(InetAddress mcastAddr, InetAddress sockAddr) { + super(gridName, "tcp-disco-multicast-addr-rcvr", log); + this.mcastAddr = mcastAddr; + this.sockAddr = sockAddr; + } + + /** {@inheritDoc} */ + @Override protected void body() throws InterruptedException { + addrs = requestAddresses(mcastAddr, sockAddr); + } + + /** + * @return Received addresses. + */ + Collection<InetSocketAddress> addresses() { + return addrs; + } + } + + /** + * Thread listening for multicast address requests and sending response + * containing socket address this node's discovery SPI listens to. + */ + private class AddressSender extends IgniteSpiThread { + /** */ + @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") + private MulticastSocket sock; + + /** */ + private final InetAddress mcastGrp; + + /** */ + private final Collection<InetSocketAddress> addrs; + + /** */ + private final InetAddress sockItf; + + /** + * @param mcastGrp Multicast address. + * @param sockItf Optional interface multicast socket should be bound to. + * @param addrs Local node addresses. + * @throws IOException If fails to create multicast socket. + */ + private AddressSender(InetAddress mcastGrp, @Nullable InetAddress sockItf, Collection<InetSocketAddress> addrs) + throws IOException { + super(gridName, "tcp-disco-multicast-addr-sender", log); + this.mcastGrp = mcastGrp; + this.addrs = addrs; + this.sockItf = sockItf; + + sock = createSocket(); + } + + /** + * Creates multicast socket and joins multicast group. + * + * @throws IOException If fails to create socket or join multicast group. + * @return Multicast socket. + */ + private MulticastSocket createSocket() throws IOException { + MulticastSocket sock = new MulticastSocket(mcastPort); + + sock.setLoopbackMode(false); // Use 'false' to enable support for more than one node on the same machine. + + if (sockItf != null) + sock.setInterface(sockItf); + + if (sock.getLoopbackMode()) + U.warn(log, "Loopback mode is disabled which prevents nodes on the same machine from discovering " + + "each other."); + + sock.joinGroup(mcastGrp); + + return sock; + } + + /** {@inheritDoc} */ + @Override protected void body() throws InterruptedException { + AddressResponse res; + + try { + res = new AddressResponse(addrs); + } + catch (GridException e) { + U.error(log, "Failed to prepare multicast message.", e); + + return; + } + + byte[] reqData = new byte[MSG_ADDR_REQ_DATA.length]; + + DatagramPacket pckt = new DatagramPacket(reqData, reqData.length); + + while (!isInterrupted()) { + try { + MulticastSocket sock; + + synchronized (this) { + if (isInterrupted()) + return; + + sock = this.sock; + + if (sock == null) + sock = createSocket(); + } + + sock.receive(pckt); + + if (!U.bytesEqual(U.GG_HEADER, 0, reqData, 0, U.GG_HEADER.length)) { + U.error(log, "Failed to verify message header."); + + continue; + } + + try { + sock.send(new DatagramPacket(res.data(), res.data().length, pckt.getAddress(), pckt.getPort())); + } + catch (IOException e) { + if (e.getMessage().contains("Operation not permitted")) { + if (log.isDebugEnabled()) + log.debug("Got 'operation not permitted' error, ignoring: " + e); + } + else + throw e; + } + } + catch (IOException e) { + if (!isInterrupted()) { + U.error(log, "Failed to send/receive address message (will try to reconnect).", e); + + synchronized (this) { + U.close(sock); + + sock = null; + } + } + } + } + } + + /** {@inheritDoc} */ + @Override public void interrupt() { + super.interrupt(); + + synchronized (this) { + U.close(sock); + + sock = null; + } + } + + /** {@inheritDoc} */ + @Override protected void cleanup() { + synchronized (this) { + U.close(sock); + + sock = null; + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ef8f69b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/package.html ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/package.html b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/package.html new file mode 100644 index 0000000..c3ced9e --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/package.html @@ -0,0 +1,15 @@ +<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> +<!-- + @html.file.header + _________ _____ __________________ _____ + __ ____/___________(_)______ /__ ____/______ ____(_)_______ + _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ +--> +<html> +<body> + <!-- Package description. --> + Contains multicast-based IP finder. +</body> +</html> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ef8f69b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/package.html ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/package.html b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/package.html new file mode 100644 index 0000000..0a05db1 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/package.html @@ -0,0 +1,15 @@ +<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> +<!-- + @html.file.header + _________ _____ __________________ _____ + __ ____/___________(_)______ /__ ____/______ ____(_)_______ + _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ +--> +<html> +<body> + <!-- Package description. --> + Contains IP finder interface and adapter. +</body> +</html> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ef8f69b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/sharedfs/TcpDiscoverySharedFsIpFinder.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/sharedfs/TcpDiscoverySharedFsIpFinder.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/sharedfs/TcpDiscoverySharedFsIpFinder.java new file mode 100644 index 0000000..d47f9ac --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/sharedfs/TcpDiscoverySharedFsIpFinder.java @@ -0,0 +1,258 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.apache.ignite.spi.discovery.tcp.ipfinder.sharedfs; + +import org.apache.ignite.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.spi.*; +import org.gridgain.grid.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.gridgain.grid.util.typedef.*; +import org.gridgain.grid.util.typedef.internal.*; +import org.gridgain.grid.util.tostring.*; + +import java.io.*; +import java.net.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +/** + * Shared filesystem-based IP finder. + * <h1 class="header">Configuration</h1> + * <h2 class="header">Mandatory</h2> + * There are no mandatory configuration parameters. + * <h2 class="header">Optional</h2> + * <ul> + * <li>Path (see {@link #setPath(String)})</li> + * <li>Shared flag (see {@link #setShared(boolean)})</li> + * </ul> + * <p> + * If {@link #getPath()} is not provided, then {@link #DFLT_PATH} will be used and + * only local nodes will discover each other. To enable discovery over network + * you must provide a path to a shared directory explicitly. + * <p> + * The directory will contain empty files named like the following 192.168.1.136#1001. + * <p> + * Note that this finder is shared by default (see {@link org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder#isShared()}. + */ +public class TcpDiscoverySharedFsIpFinder extends TcpDiscoveryIpFinderAdapter { + /** + * Default path for discovering of local nodes (testing only). Note that this path is relative to + * {@code GRIDGAIN_HOME/work} folder if {@code GRIDGAIN_HOME} system or environment variable specified, + * otherwise it is relative to {@code work} folder under system {@code java.io.tmpdir} folder. + * + * @see org.apache.ignite.configuration.IgniteConfiguration#getWorkDirectory() + */ + public static final String DFLT_PATH = "disco/tcp"; + + /** Delimiter to use between address and port tokens in file names. */ + public static final String DELIM = "#"; + + /** Grid logger. */ + @IgniteLoggerResource + private IgniteLogger log; + + /** File-system path. */ + private String path = DFLT_PATH; + + /** Folder to keep items in. */ + @GridToStringExclude + private File folder; + + /** Warning guard. */ + @GridToStringExclude + private final AtomicBoolean warnGuard = new AtomicBoolean(); + + /** Init guard. */ + @GridToStringExclude + private final AtomicBoolean initGuard = new AtomicBoolean(); + + /** Init latch. */ + @GridToStringExclude + private final CountDownLatch initLatch = new CountDownLatch(1); + + /** + * Constructor. + */ + public TcpDiscoverySharedFsIpFinder() { + setShared(true); + } + + /** + * Gets path. + * + * @return Shared path. + */ + public String getPath() { + return path; + } + + /** + * Sets path. + * + * @param path Shared path. + */ + @IgniteSpiConfiguration(optional = true) + public void setPath(String path) { + this.path = path; + } + + /** + * Initializes folder to work with. + * + * @return Folder. + * @throws org.apache.ignite.spi.IgniteSpiException If failed. + */ + private File initFolder() throws IgniteSpiException { + if (initGuard.compareAndSet(false, true)) { + if (path == null) + throw new IgniteSpiException("Shared file system path is null " + + "(it should be configured via setPath(..) configuration property)."); + + if (path.equals(DFLT_PATH) && warnGuard.compareAndSet(false, true)) + U.warn(log, "Default local computer-only share is used by IP finder."); + + try { + File tmp; + + if (new File(path).exists()) + tmp = new File(path); + else { + try { + tmp = U.resolveWorkDirectory(path, false); + } + catch (GridException e) { + throw new IgniteSpiException("Failed to resolve directory [path=" + path + + ", exception=" + e.getMessage() + ']'); + } + } + + if (!tmp.isDirectory()) + throw new IgniteSpiException("Failed to initialize shared file system path " + + "(path must point to folder): " + path); + + if (!tmp.canRead() || !tmp.canWrite()) + throw new IgniteSpiException("Failed to initialize shared file system path " + + "(path must be readable and writable): " + path); + + folder = tmp; + } + finally { + initLatch.countDown(); + } + } + else { + try { + U.await(initLatch); + } + catch (GridInterruptedException e) { + throw new IgniteSpiException("Thread has been interrupted.", e); + } + + if (folder == null) + throw new IgniteSpiException("Failed to initialize shared file system folder (check logs for errors)."); + } + + return folder; + } + + /** {@inheritDoc} */ + @Override public Collection<InetSocketAddress> getRegisteredAddresses() throws IgniteSpiException { + initFolder(); + + Collection<InetSocketAddress> addrs = new LinkedList<>(); + + for (String fileName : folder.list()) + if (!".svn".equals(fileName)) { + InetSocketAddress addr = null; + + StringTokenizer st = new StringTokenizer(fileName, DELIM); + + if (st.countTokens() == 2) { + String addrStr = st.nextToken(); + String portStr = st.nextToken(); + + try { + int port = Integer.parseInt(portStr); + + addr = new InetSocketAddress(addrStr, port); + } + catch (IllegalArgumentException e) { + U.error(log, "Failed to parse file entry: " + fileName, e); + } + } + + if (addr != null) + addrs.add(addr); + } + + return Collections.unmodifiableCollection(addrs); + } + + /** {@inheritDoc} */ + @Override public void registerAddresses(Collection<InetSocketAddress> addrs) throws IgniteSpiException { + assert !F.isEmpty(addrs); + + initFolder(); + + try { + for (InetSocketAddress addr : addrs) { + File file = new File(folder, name(addr)); + + file.createNewFile(); + } + } + catch (IOException e) { + throw new IgniteSpiException("Failed to create file.", e); + } + } + + /** {@inheritDoc} */ + @Override public void unregisterAddresses(Collection<InetSocketAddress> addrs) throws IgniteSpiException { + assert !F.isEmpty(addrs); + + initFolder(); + + try { + for (InetSocketAddress addr : addrs) { + File file = new File(folder, name(addr)); + + file.delete(); + } + } + catch (SecurityException e) { + throw new IgniteSpiException("Failed to delete file.", e); + } + } + + /** + * Creates file name for address. + * + * @param addr Node address. + * @return Name. + */ + private String name(InetSocketAddress addr) { + assert addr != null; + + SB sb = new SB(); + + sb.a(addr.getAddress().getHostAddress()) + .a(DELIM) + .a(addr.getPort()); + + return sb.toString(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TcpDiscoverySharedFsIpFinder.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ef8f69b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/sharedfs/package.html ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/sharedfs/package.html b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/sharedfs/package.html new file mode 100644 index 0000000..e1305a3 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/sharedfs/package.html @@ -0,0 +1,15 @@ +<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> +<!-- + @html.file.header + _________ _____ __________________ _____ + __ ____/___________(_)______ /__ ____/______ ____(_)_______ + _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ +--> +<html> +<body> + <!-- Package description. --> + Contains shared filesystem-based IP finder. +</body> +</html> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ef8f69b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/vm/TcpDiscoveryVmIpFinder.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/vm/TcpDiscoveryVmIpFinder.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/vm/TcpDiscoveryVmIpFinder.java new file mode 100644 index 0000000..a6e9c0a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/vm/TcpDiscoveryVmIpFinder.java @@ -0,0 +1,255 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.apache.ignite.spi.discovery.tcp.ipfinder.vm; + +import org.apache.ignite.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.spi.*; +import org.gridgain.grid.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.gridgain.grid.util.tostring.*; +import org.gridgain.grid.util.typedef.*; +import org.gridgain.grid.util.typedef.internal.*; + +import java.net.*; +import java.util.*; + +import static org.apache.ignite.IgniteSystemProperties.*; + +/** + * IP Finder which works only with pre-configured list of IP addresses specified + * via {@link #setAddresses(Collection)} method. By default, this IP finder is + * not {@code shared}, which means that all grid nodes have to be configured with the + * same list of IP addresses when this IP finder is used. + * <h1 class="header">Configuration</h1> + * <h2 class="header">Mandatory</h2> + * There are no mandatory configuration parameters. + * <h2 class="header">Optional</h2> + * <ul> + * <li>Addresses for initialization (see {@link #setAddresses(Collection)})</li> + * <li>Shared flag (see {@link #setShared(boolean)})</li> + * </ul> + */ +public class TcpDiscoveryVmIpFinder extends TcpDiscoveryIpFinderAdapter { + /** Grid logger. */ + @IgniteLoggerResource + private IgniteLogger log; + + /** Addresses. */ + @GridToStringInclude + private Collection<InetSocketAddress> addrs; + + /** + * Initialize from system property. + */ + { + String ips = IgniteSystemProperties.getString(GG_TCP_DISCOVERY_ADDRESSES); + + if (!F.isEmpty(ips)) { + Collection<InetSocketAddress> addrsList = new LinkedHashSet<>(); + + for (String s : ips.split(",")) { + if (!F.isEmpty(s)) { + s = s.trim(); + + if (!F.isEmpty(s)) { + try { + addrsList.addAll(address(s)); + } + catch (IgniteSpiException e) { + throw new GridRuntimeException(e); + } + } + } + } + + addrs = addrsList; + } + else + addrs = new LinkedHashSet<>(); + } + + /** + * Constructs new IP finder. + */ + public TcpDiscoveryVmIpFinder() { + // No-op. + } + + /** + * Constructs new IP finder. + * + * @param shared {@code true} if IP finder is shared. + * @see #setShared(boolean) + */ + public TcpDiscoveryVmIpFinder(boolean shared) { + setShared(shared); + } + + /** + * Parses provided values and initializes the internal collection of addresses. + * <p> + * Addresses may be represented as follows: + * <ul> + * <li>IP address (e.g. 127.0.0.1, 9.9.9.9, etc);</li> + * <li>IP address and port (e.g. 127.0.0.1:47500, 9.9.9.9:47501, etc);</li> + * <li>IP address and port range (e.g. 127.0.0.1:47500..47510, 9.9.9.9:47501..47504, etc);</li> + * <li>Hostname (e.g. host1.com, host2, etc);</li> + * <li>Hostname and port (e.g. host1.com:47500, host2:47502, etc).</li> + * <li>Hostname and port range (e.g. host1.com:47500..47510, host2:47502..47508, etc).</li> + * </ul> + * <p> + * If port is 0 or not provided then default port will be used (depends on + * discovery SPI configuration). + * <p> + * If port range is provided (e.g. host:port1..port2) the following should be considered: + * <ul> + * <li>{@code port1 < port2} should be {@code true};</li> + * <li>Both {@code port1} and {@code port2} should be greater than {@code 0}.</li> + * </ul> + * + * @param addrs Known nodes addresses. + * @throws org.apache.ignite.spi.IgniteSpiException If any error occurs. + */ + @IgniteSpiConfiguration(optional = true) + public synchronized void setAddresses(Collection<String> addrs) throws IgniteSpiException { + if (F.isEmpty(addrs)) + return; + + Collection<InetSocketAddress> newAddrs = new LinkedHashSet<>(); + + for (String ipStr : addrs) + newAddrs.addAll(address(ipStr)); + + this.addrs = newAddrs; + } + + /** + * Creates address from string. + * + * @param ipStr Address string. + * @return Socket addresses (may contain 1 or more addresses if provided string + * includes port range). + * @throws org.apache.ignite.spi.IgniteSpiException If failed. + */ + private static Collection<InetSocketAddress> address(String ipStr) throws IgniteSpiException { + ipStr = ipStr.trim(); + + String errMsg = "Failed to parse provided address: " + ipStr; + + int colonCnt = ipStr.length() - ipStr.replace(":", "").length(); + + if (colonCnt > 1) { + // IPv6 address (literal IPv6 addresses are enclosed in square brackets, for example + // https://[2001:db8:85a3:8d3:1319:8a2e:370:7348]:443). + if (ipStr.startsWith("[")) { + ipStr = ipStr.substring(1); + + if (ipStr.contains("]:")) + return addresses(ipStr, "\\]\\:", errMsg); + else if (ipStr.endsWith("]")) + ipStr = ipStr.substring(0, ipStr.length() - 1); + else + throw new IgniteSpiException(errMsg); + } + } + else { + // IPv4 address. + if (ipStr.endsWith(":")) + ipStr = ipStr.substring(0, ipStr.length() - 1); + else if (ipStr.indexOf(':') >= 0) + return addresses(ipStr, "\\:", errMsg); + } + + // Provided address does not contain port (will use default one). + return Collections.singleton(new InetSocketAddress(ipStr, 0)); + } + + /** + * Creates address from string with port information. + * + * @param ipStr Address string + * @param regexDelim Port regex delimiter. + * @param errMsg Error message. + * @return Socket addresses (may contain 1 or more addresses if provided string + * includes port range). + * @throws org.apache.ignite.spi.IgniteSpiException If failed. + */ + private static Collection<InetSocketAddress> addresses(String ipStr, String regexDelim, String errMsg) + throws IgniteSpiException { + String[] tokens = ipStr.split(regexDelim); + + if (tokens.length == 2) { + String addrStr = tokens[0]; + String portStr = tokens[1]; + + if (portStr.contains("..")) { + try { + int port1 = Integer.parseInt(portStr.substring(0, portStr.indexOf(".."))); + int port2 = Integer.parseInt(portStr.substring(portStr.indexOf("..") + 2, portStr.length())); + + if (port2 < port1 || port1 == port2 || port1 <= 0 || port2 <= 0) + throw new IgniteSpiException(errMsg); + + Collection<InetSocketAddress> res = new ArrayList<>(port2 - port1); + + // Upper bound included. + for (int i = port1; i <= port2; i++) + res.add(new InetSocketAddress(addrStr, i)); + + return res; + } + catch (IllegalArgumentException e) { + throw new IgniteSpiException(errMsg, e); + } + } + else { + try { + int port = Integer.parseInt(portStr); + + return Collections.singleton(new InetSocketAddress(addrStr, port)); + } + catch (IllegalArgumentException e) { + throw new IgniteSpiException(errMsg, e); + } + } + } + else + throw new IgniteSpiException(errMsg); + } + + /** {@inheritDoc} */ + @Override public synchronized Collection<InetSocketAddress> getRegisteredAddresses() { + return Collections.unmodifiableCollection(addrs); + } + + /** {@inheritDoc} */ + @Override public synchronized void registerAddresses(Collection<InetSocketAddress> addrs) { + assert !F.isEmpty(addrs); + + this.addrs = new LinkedHashSet<>(this.addrs); + + this.addrs.addAll(addrs); + } + + /** {@inheritDoc} */ + @Override public synchronized void unregisterAddresses(Collection<InetSocketAddress> addrs) { + assert !F.isEmpty(addrs); + + this.addrs = new LinkedHashSet<>(this.addrs); + + this.addrs.removeAll(addrs); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TcpDiscoveryVmIpFinder.class, this, "super", super.toString()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ef8f69b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/vm/package.html ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/vm/package.html b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/vm/package.html new file mode 100644 index 0000000..7016083 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/vm/package.html @@ -0,0 +1,15 @@ +<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> +<!-- + @html.file.header + _________ _____ __________________ _____ + __ ____/___________(_)______ /__ ____/______ ____(_)_______ + _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ +--> +<html> +<body> + <!-- Package description. --> + Contains local JVM-based IP finder. +</body> +</html>