http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ef8f69b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/internal/TcpDiscoveryStatistics.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/internal/TcpDiscoveryStatistics.java b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/internal/TcpDiscoveryStatistics.java deleted file mode 100644 index 68dd179..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/internal/TcpDiscoveryStatistics.java +++ /dev/null @@ -1,639 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.spi.discovery.tcp.internal; - -import org.apache.ignite.lang.*; -import org.gridgain.grid.spi.discovery.tcp.messages.*; -import org.gridgain.grid.util.*; -import org.gridgain.grid.util.typedef.*; -import org.gridgain.grid.util.typedef.internal.*; -import org.gridgain.grid.util.tostring.*; - -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -/** - * Statistics for {@link org.gridgain.grid.spi.discovery.tcp.TcpDiscoverySpi}. - */ -public class TcpDiscoveryStatistics { - /** Join started timestamp. */ - private long joinStartedTs; - - /** Join finished timestamp. */ - private long joinFinishedTs; - - /** Coordinator since timestamp. */ - private final AtomicLong crdSinceTs = new AtomicLong(); - - /** Joined nodes count. */ - private int joinedNodesCnt; - - /** Failed nodes count. */ - private int failedNodesCnt; - - /** Left nodes count. */ - private int leftNodesCnt; - - /** Ack timeouts count. */ - private int ackTimeoutsCnt; - - /** Socket timeouts count. */ - private int sockTimeoutsCnt; - - /** Received messages. */ - @GridToStringInclude - private final Map<String, Integer> rcvdMsgs = new HashMap<>(); - - /** Processed messages. */ - @GridToStringInclude - private final Map<String, Integer> procMsgs = new HashMap<>(); - - /** Average time taken to serialize messages. */ - @GridToStringInclude - private final Map<String, Long> avgMsgsSndTimes = new HashMap<>(); - - /** Average time taken to serialize messages. */ - @GridToStringInclude - private final Map<String, Long> maxMsgsSndTimes = new HashMap<>(); - - /** Sent messages. */ - @GridToStringInclude - private final Map<String, Integer> sentMsgs = new HashMap<>(); - - /** Messages receive timestamps. */ - private final Map<IgniteUuid, Long> msgsRcvTs = new GridBoundedLinkedHashMap<>(1024); - - /** Messages processing start timestamps. */ - private final Map<IgniteUuid, Long> msgsProcStartTs = new GridBoundedLinkedHashMap<>(1024); - - /** Ring messages sent timestamps. */ - private final Map<IgniteUuid, Long> ringMsgsSndTs = new GridBoundedLinkedHashMap<>(1024); - - /** Average time messages is in queue. */ - private long avgMsgQueueTime; - - /** Max time messages is in queue. */ - private long maxMsgQueueTime; - - /** Total number of ring messages sent. */ - private int ringMsgsSent; - - /** Average time it takes for messages to pass the full ring. */ - private long avgRingMsgTime; - - /** Max time it takes for messages to pass the full ring. */ - private long maxRingMsgTime; - - /** Class name of ring message that required the biggest time for full ring traverse. */ - private String maxRingTimeMsgCls; - - /** Average message processing time. */ - private long avgMsgProcTime; - - /** Max message processing time. */ - private long maxMsgProcTime; - - /** Class name of the message that required the biggest time to process. */ - private String maxProcTimeMsgCls; - - /** Socket readers created count. */ - private int sockReadersCreated; - - /** Socket readers removed count. */ - private int sockReadersRmv; - - /** Average time it takes to initialize connection from another node. */ - private long avgSrvSockInitTime; - - /** Max time it takes to initialize connection from another node. */ - private long maxSrvSockInitTime; - - /** Number of outgoing connections established. */ - private int clientSockCreatedCnt; - - /** Average time it takes to connect to another node. */ - private long avgClientSockInitTime; - - /** Max time it takes to connect to another node. */ - private long maxClientSockInitTime; - - /** Pending messages registered count. */ - private int pendingMsgsRegistered; - - /** Pending messages discarded count. */ - private int pendingMsgsDiscarded; - - /** - * Increments joined nodes count. - */ - public synchronized void onNodeJoined() { - joinedNodesCnt++; - } - - /** - * Increments left nodes count. - */ - public synchronized void onNodeLeft() { - leftNodesCnt++; - } - - /** - * Increments failed nodes count. - */ - public synchronized void onNodeFailed() { - failedNodesCnt++; - } - - /** - * Increments ack timeouts count. - */ - public synchronized void onAckTimeout() { - ackTimeoutsCnt++; - } - - /** - * Increments socket timeouts count. - */ - public synchronized void onSocketTimeout() { - sockTimeoutsCnt++; - } - - /** - * Initializes coordinator since date (if needed). - */ - public void onBecomingCoordinator() { - crdSinceTs.compareAndSet(0, U.currentTimeMillis()); - } - - /** - * Initializes join started timestamp. - */ - public synchronized void onJoinStarted() { - joinStartedTs = U.currentTimeMillis(); - } - - /** - * Initializes join finished timestamp. - */ - public synchronized void onJoinFinished() { - joinFinishedTs = U.currentTimeMillis(); - } - - /** - * @return Join started timestamp. - */ - public synchronized long joinStarted() { - return joinStartedTs; - } - - /** - * @return Join finished timestamp. - */ - public synchronized long joinFinished() { - return joinFinishedTs; - } - - /** - * Collects necessary stats for message received by SPI. - * - * @param msg Received message. - */ - public synchronized void onMessageReceived(TcpDiscoveryAbstractMessage msg) { - assert msg != null; - - Integer cnt = F.addIfAbsent(rcvdMsgs, msg.getClass().getSimpleName(), new Callable<Integer>() { - @Override public Integer call() { - return 0; - } - }); - - assert cnt != null; - - rcvdMsgs.put(msg.getClass().getSimpleName(), ++cnt); - - msgsRcvTs.put(msg.id(), U.currentTimeMillis()); - } - - /** - * Collects necessary stats for message processed by SPI. - * - * @param msg Processed message. - */ - public synchronized void onMessageProcessingStarted(TcpDiscoveryAbstractMessage msg) { - assert msg != null; - - Integer cnt = F.addIfAbsent(procMsgs, msg.getClass().getSimpleName(), new Callable<Integer>() { - @Override public Integer call() { - return 0; - } - }); - - assert cnt != null; - - procMsgs.put(msg.getClass().getSimpleName(), ++cnt); - - Long rcvdTs = msgsRcvTs.remove(msg.id()); - - if (rcvdTs != null) { - long duration = U.currentTimeMillis() - rcvdTs; - - if (maxMsgQueueTime < duration) - maxMsgQueueTime = duration; - - avgMsgQueueTime = (avgMsgQueueTime * (totalReceivedMessages() -1)) / totalProcessedMessages(); - } - - msgsProcStartTs.put(msg.id(), U.currentTimeMillis()); - } - - /** - * Collects necessary stats for message processed by SPI. - * - * @param msg Processed message. - */ - public synchronized void onMessageProcessingFinished(TcpDiscoveryAbstractMessage msg) { - assert msg != null; - - Long startTs = msgsProcStartTs.get(msg.id()); - - if (startTs != null) { - long duration = U.currentTimeMillis() - startTs; - - avgMsgProcTime = (avgMsgProcTime * (totalProcessedMessages() - 1) + duration) / totalProcessedMessages(); - - if (duration > maxMsgProcTime) { - maxMsgProcTime = duration; - - maxProcTimeMsgCls = msg.getClass().getSimpleName(); - } - - msgsProcStartTs.remove(msg.id()); - } - } - - /** - * Called by coordinator when ring message is sent. - * - * @param msg Sent message. - * @param time Time taken to serialize message. - */ - public synchronized void onMessageSent(TcpDiscoveryAbstractMessage msg, long time) { - assert msg != null; - assert time >= 0; - - if (crdSinceTs.get() > 0 && - (msg instanceof TcpDiscoveryNodeAddedMessage) || - (msg instanceof TcpDiscoveryNodeLeftMessage) || - (msg instanceof TcpDiscoveryNodeFailedMessage)) { - ringMsgsSndTs.put(msg.id(), U.currentTimeMillis()); - - ringMsgsSent++; - } - - Integer cnt = F.addIfAbsent(sentMsgs, msg.getClass().getSimpleName(), new Callable<Integer>() { - @Override public Integer call() { - return 0; - } - }); - - assert cnt != null; - - sentMsgs.put(msg.getClass().getSimpleName(), ++cnt); - - Long avgTime = F.addIfAbsent(avgMsgsSndTimes, msg.getClass().getSimpleName(), new Callable<Long>() { - @Override public Long call() { - return 0L; - } - }); - - assert avgTime != null; - - avgTime = (avgTime * (cnt - 1) + time) / cnt; - - avgMsgsSndTimes.put(msg.getClass().getSimpleName(), avgTime); - - Long maxTime = F.addIfAbsent(maxMsgsSndTimes, msg.getClass().getSimpleName(), new Callable<Long>() { - @Override public Long call() { - return 0L; - } - }); - - assert maxTime != null; - - if (time > maxTime) - maxMsgsSndTimes.put(msg.getClass().getSimpleName(), time); - } - - /** - * Called by coordinator when ring message makes full pass. - * - * @param msg Message. - */ - public synchronized void onRingMessageReceived(TcpDiscoveryAbstractMessage msg) { - assert msg != null; - - Long sentTs = ringMsgsSndTs.get(msg.id()); - - if (sentTs != null) { - long duration = U.currentTimeMillis() - sentTs; - - if (maxRingMsgTime < duration) { - maxRingMsgTime = duration; - - maxRingTimeMsgCls = msg.getClass().getSimpleName(); - } - - if (ringMsgsSent != 0) - avgRingMsgTime = (avgRingMsgTime * (ringMsgsSent - 1) + duration) / ringMsgsSent; - } - } - - /** - * Gets max time for ring message to make full pass. - * - * @return Max full pass time. - */ - public synchronized long maxRingMessageTime() { - return maxRingMsgTime; - } - - /** - * Gets class name of the message that took max time to make full pass. - * - * @return Message class name. - */ - public synchronized String maxRingDurationMessageClass() { - return maxRingTimeMsgCls; - } - - /** - * Gets class name of the message took max time to process. - * - * @return Message class name. - */ - public synchronized String maxProcessingTimeMessageClass() { - return maxProcTimeMsgCls; - } - - /** - * @param initTime Time socket was initialized in. - */ - public synchronized void onServerSocketInitialized(long initTime) { - assert initTime >= 0; - - if (maxSrvSockInitTime < initTime) - maxSrvSockInitTime = initTime; - - avgSrvSockInitTime = (avgSrvSockInitTime * (sockReadersCreated - 1) + initTime) / sockReadersCreated; - } - - /** - * @param initTime Time socket was initialized in. - */ - public synchronized void onClientSocketInitialized(long initTime) { - assert initTime >= 0; - - clientSockCreatedCnt++; - - if (maxClientSockInitTime < initTime) - maxClientSockInitTime = initTime; - - avgClientSockInitTime = (avgClientSockInitTime * (clientSockCreatedCnt - 1) + initTime) / clientSockCreatedCnt; - } - - /** - * Increments pending messages registered count. - */ - public synchronized void onPendingMessageRegistered() { - pendingMsgsRegistered++; - } - - /** - * Increments pending messages discarded count. - */ - public synchronized void onPendingMessageDiscarded() { - pendingMsgsDiscarded++; - } - - /** - * Increments socket readers created count. - */ - public synchronized void onSocketReaderCreated() { - sockReadersCreated++; - } - - /** - * Increments socket readers removed count. - */ - public synchronized void onSocketReaderRemoved() { - sockReadersRmv++; - } - - /** - * Gets processed messages counts (grouped by type). - * - * @return Map containing message types and respective counts. - */ - public synchronized Map<String, Integer> processedMessages() { - return new HashMap<>(procMsgs); - } - - /** - * Gets received messages counts (grouped by type). - * - * @return Map containing message types and respective counts. - */ - public synchronized Map<String, Integer> receivedMessages() { - return new HashMap<>(rcvdMsgs); - } - - /** - * Gets max messages send time (grouped by type). - * - * @return Map containing messages types and max send times. - */ - public synchronized Map<String, Long> maxMessagesSendTimes() { - return new HashMap<>(maxMsgsSndTimes); - } - - /** - * Gets average messages send time (grouped by type). - * - * @return Map containing messages types and average send times. - */ - public synchronized Map<String, Long> avgMessagesSendTimes() { - return new HashMap<>(avgMsgsSndTimes); - } - - /** - * Gets total received messages count. - * - * @return Total received messages count. - */ - public synchronized int totalReceivedMessages() { - return F.sumInt(receivedMessages().values()); - } - - /** - * Gets total processed messages count. - * - * @return Total processed messages count. - */ - public synchronized int totalProcessedMessages() { - return F.sumInt(processedMessages().values()); - } - - /** - * Gets max message processing time. - * - * @return Max message processing time. - */ - public synchronized long maxMessageProcessingTime(){ - return maxMsgProcTime; - } - - /** - * Gets average message processing time. - * - * @return Average message processing time. - */ - public synchronized long avgMessageProcessingTime() { - return avgMsgProcTime; - } - - /** - * Gets pending messages registered count. - * - * @return Pending messages registered count. - */ - public synchronized long pendingMessagesRegistered() { - return pendingMsgsRegistered; - } - - /** - * Gets pending messages discarded count. - * - * @return Pending messages registered count. - */ - public synchronized long pendingMessagesDiscarded() { - return pendingMsgsDiscarded; - } - - /** - * Gets nodes joined count. - * - * @return Nodes joined count. - */ - public synchronized int joinedNodesCount() { - return joinedNodesCnt; - } - - /** - * Gets nodes left count. - * - * @return Nodes left count. - */ - public synchronized int leftNodesCount() { - return leftNodesCnt; - } - - /** - * Gets failed nodes count. - * - * @return Failed nodes count. - */ - public synchronized int failedNodesCount() { - return failedNodesCnt; - } - - /** - * @return Ack timeouts count. - */ - public synchronized int ackTimeoutsCount() { - return ackTimeoutsCnt; - } - - /** - * @return Socket timeouts count. - */ - public synchronized int socketTimeoutsCount() { - return sockTimeoutsCnt; - } - - /** - * Gets socket readers created count. - * - * @return Socket readers created count. - */ - public synchronized int socketReadersCreated() { - return sockReadersCreated; - } - - /** - * Gets socket readers removed count. - * - * @return Socket readers removed count. - */ - public synchronized int socketReadersRemoved() { - return sockReadersRmv; - } - - /** - * Gets time local node has been coordinator since. - * - * @return Coordinator since timestamp. - */ - public long coordinatorSinceTimestamp() { - return crdSinceTs.get(); - } - - /** - * Clears statistics. - */ - public synchronized void clear() { - ackTimeoutsCnt = 0; - avgClientSockInitTime = 0; - avgMsgProcTime = 0; - avgMsgQueueTime = 0; - avgMsgsSndTimes.clear(); - avgRingMsgTime = 0; - avgSrvSockInitTime = 0; - clientSockCreatedCnt = 0; - crdSinceTs.set(0); - failedNodesCnt = 0; - joinedNodesCnt = 0; - joinFinishedTs = 0; - joinStartedTs = 0; - leftNodesCnt = 0; - maxClientSockInitTime = 0; - maxMsgProcTime = 0; - maxMsgQueueTime = 0; - maxMsgsSndTimes.clear(); - maxProcTimeMsgCls = null; - maxRingMsgTime = 0; - maxRingTimeMsgCls = null; - maxSrvSockInitTime = 0; - pendingMsgsDiscarded = 0; - pendingMsgsRegistered = 0; - procMsgs.clear(); - rcvdMsgs.clear(); - ringMsgsSent = 0; - sentMsgs.clear(); - sockReadersCreated = 0; - sockReadersRmv = 0; - sockTimeoutsCnt = 0; - } - - /** {@inheritDoc} */ - @Override public synchronized String toString() { - return S.toString(TcpDiscoveryStatistics.class, this); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ef8f69b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/internal/package.html ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/internal/package.html b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/internal/package.html deleted file mode 100644 index b44e05c..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/internal/package.html +++ /dev/null @@ -1,15 +0,0 @@ -<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> -<!-- - @html.file.header - _________ _____ __________________ _____ - __ ____/___________(_)______ /__ ____/______ ____(_)_______ - _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ ---> -<html> -<body> - <!-- Package description. --> - Contains internal implementation. -</body> -</html> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ef8f69b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinder.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinder.java b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinder.java deleted file mode 100644 index b5ff462..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinder.java +++ /dev/null @@ -1,90 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.spi.discovery.tcp.ipfinder; - -import org.apache.ignite.spi.*; - -import java.net.*; -import java.util.*; - -/** - * IP finder interface for {@link org.gridgain.grid.spi.discovery.tcp.TcpDiscoverySpi}. - */ -public interface TcpDiscoveryIpFinder { - /** - * Callback invoked when SPI context is initialized after {@link org.gridgain.grid.spi.discovery.tcp.TcpDiscoverySpi#spiStart(String)} - * method is completed, SPI context can be stored for future access. - * - * @param spiCtx Spi context. - * @throws org.apache.ignite.spi.IgniteSpiException In case of error. - */ - public void onSpiContextInitialized(IgniteSpiContext spiCtx) throws IgniteSpiException; - - /** - * Callback invoked prior to stopping grid before SPI context is destroyed. - * Note that invoking SPI context after this callback is complete is considered - * illegal and may produce unknown results. - */ - public void onSpiContextDestroyed(); - - /** - * Initializes addresses discovery SPI binds to. - * - * @param addrs Addresses discovery SPI binds to. - * @throws org.apache.ignite.spi.IgniteSpiException In case of error. - */ - public void initializeLocalAddresses(Collection<InetSocketAddress> addrs) throws IgniteSpiException; - - /** - * Gets all addresses registered in this finder. - * - * @return All known addresses, potentially empty, but never {@code null}. - * @throws org.apache.ignite.spi.IgniteSpiException In case of error. - */ - public Collection<InetSocketAddress> getRegisteredAddresses() throws IgniteSpiException; - - /** - * Checks whether IP finder is shared or not. - * <p> - * If it is shared then only coordinator can unregister addresses. - * <p> - * All nodes should register their address themselves, as early as possible on node start. - * - * @return {@code true} if IP finder is shared. - */ - public boolean isShared(); - - /** - * Registers new addresses. - * <p> - * Implementation should accept duplicates quietly, but should not register address if it - * is already registered. - * - * @param addrs Addresses to register. Not {@code null} and not empty. - * @throws org.apache.ignite.spi.IgniteSpiException In case of error. - */ - public void registerAddresses(Collection<InetSocketAddress> addrs) throws IgniteSpiException; - - /** - * Unregisters provided addresses. - * <p> - * Implementation should accept addresses that are currently not - * registered quietly (just no-op). - * - * @param addrs Addresses to unregister. Not {@code null} and not empty. - * @throws org.apache.ignite.spi.IgniteSpiException In case of error. - */ - public void unregisterAddresses(Collection<InetSocketAddress> addrs) throws IgniteSpiException; - - /** - * Closes this IP finder and releases any system resources associated with it. - */ - public void close(); -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ef8f69b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinderAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinderAdapter.java b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinderAdapter.java deleted file mode 100644 index 27eb5ed..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinderAdapter.java +++ /dev/null @@ -1,77 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.spi.discovery.tcp.ipfinder; - -import org.apache.ignite.spi.*; -import org.gridgain.grid.util.tostring.*; -import org.gridgain.grid.util.typedef.internal.*; - -import java.net.*; -import java.util.*; - -/** - * IP finder interface implementation adapter. - */ -public abstract class TcpDiscoveryIpFinderAdapter implements TcpDiscoveryIpFinder { - /** Shared flag. */ - private boolean shared; - - /** SPI context. */ - @GridToStringExclude - private volatile IgniteSpiContext spiCtx; - - /** {@inheritDoc} */ - @Override public void onSpiContextInitialized(IgniteSpiContext spiCtx) throws IgniteSpiException { - this.spiCtx = spiCtx; - } - - /** {@inheritDoc} */ - @Override public void onSpiContextDestroyed() { - spiCtx = null; - } - - /** {@inheritDoc} */ - @Override public void initializeLocalAddresses(Collection<InetSocketAddress> addrs) throws IgniteSpiException { - registerAddresses(addrs); - } - - /** {@inheritDoc} */ - @Override public boolean isShared() { - return shared; - } - - /** - * Sets shared flag. If {@code true} then it is expected that IP addresses registered - * with IP finder will be seen by IP finders on all other nodes. - * - * @param shared {@code true} if this IP finder is shared. - */ - @IgniteSpiConfiguration(optional = true) - public void setShared(boolean shared) { - this.shared = shared; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(TcpDiscoveryIpFinderAdapter.class, this); - } - - /** {@inheritDoc} */ - @Override public void close() { - // No-op. - } - - /** - * @return SPI context. - */ - protected IgniteSpiContext spiContext() { - return spiCtx; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ef8f69b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/ipfinder/jdbc/TcpDiscoveryJdbcIpFinder.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/ipfinder/jdbc/TcpDiscoveryJdbcIpFinder.java b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/ipfinder/jdbc/TcpDiscoveryJdbcIpFinder.java deleted file mode 100644 index 4b5c00b..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/ipfinder/jdbc/TcpDiscoveryJdbcIpFinder.java +++ /dev/null @@ -1,361 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.spi.discovery.tcp.ipfinder.jdbc; - -import org.apache.ignite.*; -import org.apache.ignite.resources.*; -import org.apache.ignite.spi.*; -import org.gridgain.grid.*; -import org.gridgain.grid.spi.discovery.tcp.ipfinder.*; -import org.gridgain.grid.util.typedef.*; -import org.gridgain.grid.util.typedef.internal.*; -import org.gridgain.grid.util.tostring.*; - -import javax.sql.*; -import java.net.*; -import java.sql.*; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -import static java.sql.Connection.*; - -/** - * JDBC-based IP finder. - * <h1 class="header">Configuration</h1> - * <h2 class="header">Mandatory</h2> - * <ul> - * <li>Data source (see {@link #setDataSource(DataSource)}).</li> - * </ul> - * <h2 class="header">Optional</h2> - * The following configuration parameters are optional: - * <ul> - * <li>Flag indicating whether DB schema should be initialized by GridGain (default behaviour) or - * was explicitly created by user (see {@link #setInitSchema(boolean)})</li> - * </ul> - * <p> - * The database will contain 1 table which will hold IP addresses. - */ -public class TcpDiscoveryJdbcIpFinder extends TcpDiscoveryIpFinderAdapter { - /** Query to get addresses. */ - public static final String GET_ADDRS_QRY = "select hostname, port from tbl_addrs"; - - /** Query to register address. */ - public static final String REG_ADDR_QRY = "insert into tbl_addrs values (?, ?)"; - - /** Query to unregister address. */ - public static final String UNREG_ADDR_QRY = "delete from tbl_addrs where hostname = ? and port = ?"; - - /** Query to create addresses table. */ - public static final String CREATE_ADDRS_TABLE_QRY = - "create table if not exists tbl_addrs (" + - "hostname VARCHAR(1024), " + - "port INT)"; - - /** Query to check database validity. */ - public static final String CHK_QRY = "select count(*) from tbl_addrs"; - - /** Grid logger. */ - @IgniteLoggerResource - private IgniteLogger log; - - /** Data source. */ - private DataSource dataSrc; - - /** Flag for schema initialization. */ - private boolean initSchema = true; - - /** Init guard. */ - @GridToStringExclude - private final AtomicBoolean initGuard = new AtomicBoolean(); - - /** Init latch. */ - @GridToStringExclude - private final CountDownLatch initLatch = new CountDownLatch(1); - - /** - * Constructor. - */ - public TcpDiscoveryJdbcIpFinder() { - setShared(true); - } - - /** {@inheritDoc} */ - @Override public Collection<InetSocketAddress> getRegisteredAddresses() throws IgniteSpiException { - init(); - - Connection conn = null; - - PreparedStatement stmt = null; - - ResultSet rs = null; - - try { - conn = dataSrc.getConnection(); - - conn.setTransactionIsolation(TRANSACTION_READ_COMMITTED); - - stmt = conn.prepareStatement(GET_ADDRS_QRY); - - rs = stmt.executeQuery(); - - Collection<InetSocketAddress> addrs = new LinkedList<>(); - - while (rs.next()) - addrs.add(new InetSocketAddress(rs.getString(1), rs.getInt(2))); - - return addrs; - } - catch (SQLException e) { - throw new IgniteSpiException("Failed to get registered addresses version.", e); - } - finally { - U.closeQuiet(rs); - U.closeQuiet(stmt); - U.closeQuiet(conn); - } - } - - /** {@inheritDoc} */ - @Override public void registerAddresses(Collection<InetSocketAddress> addrs) throws IgniteSpiException { - assert !F.isEmpty(addrs); - - init(); - - Connection conn = null; - - PreparedStatement stmtUnreg = null; - - PreparedStatement stmtReg = null; - - boolean committed = false; - - try { - conn = dataSrc.getConnection(); - - conn.setAutoCommit(false); - - conn.setTransactionIsolation(TRANSACTION_READ_COMMITTED); - - stmtUnreg = conn.prepareStatement(UNREG_ADDR_QRY); - stmtReg = conn.prepareStatement(REG_ADDR_QRY); - - for (InetSocketAddress addr : addrs) { - stmtUnreg.setString(1, addr.getAddress().getHostAddress()); - stmtUnreg.setInt(2, addr.getPort()); - - stmtUnreg.addBatch(); - - stmtReg.setString(1, addr.getAddress().getHostAddress()); - stmtReg.setInt(2, addr.getPort()); - - stmtReg.addBatch(); - } - - stmtUnreg.executeBatch(); - stmtUnreg.close(); - - stmtReg.executeBatch(); - stmtReg.close(); - - conn.commit(); - - committed = true; - } - catch (SQLException e) { - U.rollbackConnectionQuiet(conn); - - throw new IgniteSpiException("Failed to register addresses: " + addrs, e); - } - finally { - if (!committed) - U.rollbackConnectionQuiet(conn); - - U.closeQuiet(stmtUnreg); - U.closeQuiet(stmtReg); - U.closeQuiet(conn); - } - } - - /** {@inheritDoc} */ - @Override public void unregisterAddresses(Collection<InetSocketAddress> addrs) throws IgniteSpiException { - assert !F.isEmpty(addrs); - - init(); - - Connection conn = null; - - PreparedStatement stmt = null; - - boolean committed = false; - - try { - conn = dataSrc.getConnection(); - - conn.setAutoCommit(false); - - conn.setTransactionIsolation(TRANSACTION_READ_COMMITTED); - - stmt = conn.prepareStatement(UNREG_ADDR_QRY); - - for (InetSocketAddress addr : addrs) { - stmt.setString(1, addr.getAddress().getHostAddress()); - stmt.setInt(2, addr.getPort()); - - stmt.addBatch(); - } - - stmt.executeBatch(); - conn.commit(); - - committed = true; - } - catch (SQLException e) { - U.rollbackConnectionQuiet(conn); - - throw new IgniteSpiException("Failed to unregister addresses: " + addrs, e); - } - finally { - if (!committed) - U.rollbackConnectionQuiet(conn); - - U.closeQuiet(stmt); - U.closeQuiet(conn); - } - } - - /** - * Sets data source. - * <p> - * Data source should be fully configured and ready-to-use. - * - * @param dataSrc Data source. - */ - @IgniteSpiConfiguration(optional = false) - public void setDataSource(DataSource dataSrc) { - this.dataSrc = dataSrc; - } - - /** - * Flag indicating whether DB schema should be initialized by GridGain (default behaviour) or - * was explicitly created by user. - * - * @param initSchema {@code True} if DB schema should be initialized by GridGain (default behaviour), - * {code @false} if schema was explicitly created by user. - */ - @IgniteSpiConfiguration(optional = true) - public void setInitSchema(boolean initSchema) { - this.initSchema = initSchema; - } - - /** - * Checks configuration validity. - * - * @throws org.apache.ignite.spi.IgniteSpiException If any error occurs. - */ - private void init() throws IgniteSpiException { - if (initGuard.compareAndSet(false, true)) { - if (dataSrc == null) - throw new IgniteSpiException("Data source is null (you must configure it via setDataSource(..)" + - " configuration property)"); - - if (!initSchema) { - initLatch.countDown(); - - checkSchema(); - - return; - } - - Connection conn = null; - - Statement stmt = null; - - boolean committed = false; - - try { - conn = dataSrc.getConnection(); - - conn.setAutoCommit(false); - - conn.setTransactionIsolation(TRANSACTION_READ_COMMITTED); - - // Create tbl_addrs. - stmt = conn.createStatement(); - - stmt.executeUpdate(CREATE_ADDRS_TABLE_QRY); - - conn.commit(); - - committed = true; - - if (log.isDebugEnabled()) - log.debug("DB schema has been initialized."); - } - catch (SQLException e) { - U.rollbackConnectionQuiet(conn); - - throw new IgniteSpiException("Failed to initialize DB schema.", e); - } - finally { - if (!committed) - U.rollbackConnectionQuiet(conn); - - U.closeQuiet(stmt); - U.closeQuiet(conn); - - initLatch.countDown(); - } - } - else - checkSchema(); - } - - /** - * Checks correctness of existing DB schema. - * - * @throws org.apache.ignite.spi.IgniteSpiException If schema wasn't properly initialized. - */ - private void checkSchema() throws IgniteSpiException { - try { - U.await(initLatch); - } - catch (GridInterruptedException e) { - throw new IgniteSpiException("Thread has been interrupted.", e); - } - - Connection conn = null; - - Statement stmt = null; - - try { - conn = dataSrc.getConnection(); - - conn.setTransactionIsolation(TRANSACTION_READ_COMMITTED); - - // Check if tbl_addrs exists and database initialized properly. - stmt = conn.createStatement(); - - stmt.execute(CHK_QRY); - } - catch (SQLException e) { - throw new IgniteSpiException("IP finder has not been properly initialized.", e); - } - finally { - U.closeQuiet(stmt); - U.closeQuiet(conn); - } - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(TcpDiscoveryJdbcIpFinder.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ef8f69b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/ipfinder/jdbc/package.html ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/ipfinder/jdbc/package.html b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/ipfinder/jdbc/package.html deleted file mode 100644 index dfe23f6..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/ipfinder/jdbc/package.html +++ /dev/null @@ -1,15 +0,0 @@ -<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> -<!-- - @html.file.header - _________ _____ __________________ _____ - __ ____/___________(_)______ /__ ____/______ ____(_)_______ - _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ ---> -<html> -<body> - <!-- Package description. --> - Contains JDBC IP finder implementation. -</body> -</html> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ef8f69b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java deleted file mode 100644 index acc20a4..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java +++ /dev/null @@ -1,752 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.spi.discovery.tcp.ipfinder.multicast; - -import org.apache.ignite.*; -import org.apache.ignite.marshaller.*; -import org.apache.ignite.marshaller.jdk.*; -import org.apache.ignite.resources.*; -import org.apache.ignite.spi.*; -import org.gridgain.grid.*; -import org.gridgain.grid.spi.discovery.tcp.ipfinder.vm.*; -import org.gridgain.grid.util.typedef.*; -import org.gridgain.grid.util.typedef.internal.*; -import org.gridgain.grid.util.tostring.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.net.*; -import java.util.*; - -import static org.apache.ignite.IgniteSystemProperties.*; -import static org.apache.ignite.spi.IgnitePortProtocol.*; - -/** - * Multicast-based IP finder. - * <p> - * When TCP discovery starts this finder sends multicast request and waits - * for some time when others nodes reply to this request with messages containing - * their addresses (time IP finder waits for response and number of attempts to - * re-send multicast request in case if no replies are received can be configured, - * see {@link #setResponseWaitTime(int)} and {@link #setAddressRequestAttempts(int)}). - * <p> - * In addition to address received via multicast this finder can work with pre-configured - * list of addresses specified via {@link #setAddresses(Collection)} method. - * <h1 class="header">Configuration</h1> - * <h2 class="header">Mandatory</h2> - * There are no mandatory configuration parameters. - * <h2 class="header">Optional</h2> - * <ul> - * <li>Multicast IP address (see {@link #setMulticastGroup(String)}).</li> - * <li>Multicast port number (see {@link #setMulticastPort(int)}).</li> - * <li>Address response wait time (see {@link #setResponseWaitTime(int)}).</li> - * <li>Address request attempts (see {@link #setAddressRequestAttempts(int)}).</li> - * <li>Pre-configured addresses (see {@link #setAddresses(Collection)})</li> - * <li>Local address (see {@link #setLocalAddress(String)})</li> - * </ul> - */ -public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder { - /** Default multicast IP address (value is {@code 228.1.2.4}). */ - public static final String DFLT_MCAST_GROUP = "228.1.2.4"; - - /** Default multicast port number (value is {@code 47400}). */ - public static final int DFLT_MCAST_PORT = 47400; - - /** Default time IP finder waits for reply to multicast address request (value is {@code 500}). */ - public static final int DFLT_RES_WAIT_TIME = 500; - - /** Default number of attempts to send multicast address request (value is {@code 2}). */ - public static final int DFLT_ADDR_REQ_ATTEMPTS = 2; - - /** Address request message data. */ - private static final byte[] MSG_ADDR_REQ_DATA = U.GG_HEADER; - - /** */ - private static final IgniteMarshaller marsh = new IgniteJdkMarshaller(); - - /** Grid logger. */ - @IgniteLoggerResource - private IgniteLogger log; - - /** Grid name. */ - @IgniteNameResource - @GridToStringExclude - private String gridName; - - /** Multicast IP address as string. */ - private String mcastGrp = DFLT_MCAST_GROUP; - - /** Multicast port number. */ - private int mcastPort = DFLT_MCAST_PORT; - - /** Time IP finder waits for reply to multicast address request. */ - private int resWaitTime = DFLT_RES_WAIT_TIME; - - /** Number of attempts to send multicast address request. */ - private int addrReqAttempts = DFLT_ADDR_REQ_ATTEMPTS; - - /** Local address */ - private String locAddr; - - /** */ - @GridToStringExclude - private Collection<AddressSender> addrSnds; - - /** - * Constructs new IP finder. - */ - public TcpDiscoveryMulticastIpFinder() { - setShared(true); - } - - /** - * Sets IP address of multicast group. - * <p> - * If not provided, default value is {@link #DFLT_MCAST_GROUP}. - * - * @param mcastGrp Multicast IP address. - */ - @IgniteSpiConfiguration(optional = true) - public void setMulticastGroup(String mcastGrp) { - this.mcastGrp = mcastGrp; - } - - /** - * Gets IP address of multicast group. - * - * @return Multicast IP address. - */ - public String getMulticastGroup() { - return mcastGrp; - } - - /** - * Sets port number which multicast messages are sent to. - * <p> - * If not provided, default value is {@link #DFLT_MCAST_PORT}. - * - * @param mcastPort Multicast port number. - */ - @IgniteSpiConfiguration(optional = true) - public void setMulticastPort(int mcastPort) { - this.mcastPort = mcastPort; - } - - /** - * Gets port number which multicast messages are sent to. - * - * @return Port number. - */ - public int getMulticastPort() { - return mcastPort; - } - - /** - * Sets time in milliseconds IP finder waits for reply to - * multicast address request. - * <p> - * If not provided, default value is {@link #DFLT_RES_WAIT_TIME}. - * - * @param resWaitTime Time IP finder waits for reply to multicast address request. - */ - @IgniteSpiConfiguration(optional = true) - public void setResponseWaitTime(int resWaitTime) { - this.resWaitTime = resWaitTime; - } - - /** - * Gets time in milliseconds IP finder waits for reply to - * multicast address request. - * - * @return Time IP finder waits for reply to multicast address request. - */ - public int getResponseWaitTime() { - return resWaitTime; - } - - /** - * Sets number of attempts to send multicast address request. IP finder re-sends - * request only in case if no reply for previous request is received. - * <p> - * If not provided, default value is {@link #DFLT_ADDR_REQ_ATTEMPTS}. - * - * @param addrReqAttempts Number of attempts to send multicast address request. - */ - @IgniteSpiConfiguration(optional = true) - public void setAddressRequestAttempts(int addrReqAttempts) { - this.addrReqAttempts = addrReqAttempts; - } - - /** - * Gets number of attempts to send multicast address request. IP finder re-sends - * request only in case if no reply for previous request is received. - * - * @return Number of attempts to send multicast address request. - */ - public int getAddressRequestAttempts() { - return addrReqAttempts; - } - - /** - * Sets local host address used by this IP finder. If provided address is non-loopback then multicast - * socket is bound to this interface. If local address is not set or is any local address then IP finder - * creates multicast sockets for all found non-loopback addresses. - * <p> - * If not provided then this property is initialized by the local address set in {@link org.gridgain.grid.spi.discovery.tcp.TcpDiscoverySpi} - * configuration. - * - * @param locAddr Local host address. - * @see org.gridgain.grid.spi.discovery.tcp.TcpDiscoverySpi#setLocalAddress(String) - */ - @IgniteSpiConfiguration(optional = true) - public void setLocalAddress(String locAddr) { - this.locAddr = locAddr; - } - - /** - * Gets local address that multicast IP finder uses. - * - * @return Local address. - */ - public String getLocalAddress() { - return locAddr; - } - - /** {@inheritDoc} */ - @Override public void initializeLocalAddresses(Collection<InetSocketAddress> addrs) throws IgniteSpiException { - // If GRIDGAIN_OVERRIDE_MCAST_GRP system property is set, use its value to override multicast group from - // configuration. Used for testing purposes. - String overrideMcastGrp = System.getProperty(GG_OVERRIDE_MCAST_GRP); - - if (overrideMcastGrp != null) - mcastGrp = overrideMcastGrp; - - if (F.isEmpty(mcastGrp)) - throw new IgniteSpiException("Multicast IP address is not specified."); - - if (mcastPort < 0 || mcastPort > 65535) - throw new IgniteSpiException("Invalid multicast port: " + mcastPort); - - if (resWaitTime <= 0) - throw new IgniteSpiException("Invalid wait time, value greater than zero is expected: " + resWaitTime); - - if (addrReqAttempts <= 0) - throw new IgniteSpiException("Invalid number of address request attempts, " + - "value greater than zero is expected: " + addrReqAttempts); - - if (F.isEmpty(getRegisteredAddresses())) - U.warn(log, "GridTcpDiscoveryMulticastIpFinder has no pre-configured addresses " + - "(it is recommended in production to specify at least one address in " + - "GridTcpDiscoveryMulticastIpFinder.getAddresses() configuration property)"); - - InetAddress mcastAddr; - - try { - mcastAddr = InetAddress.getByName(mcastGrp); - } - catch (UnknownHostException e) { - throw new IgniteSpiException("Unknown multicast group: " + mcastGrp, e); - } - - if (!mcastAddr.isMulticastAddress()) - throw new IgniteSpiException("Invalid multicast group address: " + mcastAddr); - - Collection<String> locAddrs; - - try { - locAddrs = U.resolveLocalAddresses(U.resolveLocalHost(locAddr)).get1(); - } - catch (IOException | GridException e) { - throw new IgniteSpiException("Failed to resolve local addresses [locAddr=" + locAddr + ']', e); - } - - assert locAddrs != null; - - addrSnds = new ArrayList<>(locAddrs.size()); - - Collection<InetAddress> reqItfs = new ArrayList<>(locAddrs.size()); // Interfaces used to send requests. - - for (String locAddr : locAddrs) { - InetAddress addr; - - try { - addr = InetAddress.getByName(locAddr); - } - catch (UnknownHostException e) { - if (log.isDebugEnabled()) - log.debug("Failed to resolve local address [locAddr=" + locAddr + ", err=" + e + ']'); - - continue; - } - - if (!addr.isLoopbackAddress()) { - try { - addrSnds.add(new AddressSender(mcastAddr, addr, addrs)); - - reqItfs.add(addr); - } - catch (IOException e) { - if (log.isDebugEnabled()) - log.debug("Failed to create multicast socket [mcastAddr=" + mcastAddr + - ", mcastGrp=" + mcastGrp + ", mcastPort=" + mcastPort + ", locAddr=" + addr + - ", err=" + e + ']'); - } - } - } - - if (addrSnds.isEmpty()) { - try { - // Create non-bound socket if local host is loopback or failed to create sockets explicitly - // bound to interfaces. - addrSnds.add(new AddressSender(mcastAddr, null, addrs)); - } - catch (IOException e) { - throw new IgniteSpiException("Failed to create multicast socket [mcastAddr=" + mcastAddr + - ", mcastGrp=" + mcastGrp + ", mcastPort=" + mcastPort + ']', e); - } - } - - for (AddressSender addrSnd :addrSnds) - addrSnd.start(); - - Collection<InetSocketAddress> ret; - - if (reqItfs.size() > 1) { - ret = new HashSet<>(); - - Collection<AddressReceiver> rcvrs = new ArrayList<>(); - - for (InetAddress itf : reqItfs) { - AddressReceiver rcvr = new AddressReceiver(mcastAddr, itf); - - rcvr.start(); - - rcvrs.add(rcvr); - } - - for (AddressReceiver rcvr : rcvrs) { - try { - rcvr.join(); - - ret.addAll(rcvr.addresses()); - } - catch (InterruptedException ignore) { - U.warn(log, "Got interrupted while receiving address request."); - - Thread.currentThread().interrupt(); - - break; - } - } - } - else - ret = requestAddresses(mcastAddr, F.first(reqItfs)); - - if (!ret.isEmpty()) - registerAddresses(ret); - } - - /** {@inheritDoc} */ - @Override public void onSpiContextInitialized(IgniteSpiContext spiCtx) throws IgniteSpiException { - super.onSpiContextInitialized(spiCtx); - - spiCtx.registerPort(mcastPort, UDP); - } - - /** - * Sends multicast address request message and waits for reply. Response wait time and number - * of request attempts are configured as properties {@link #setResponseWaitTime} and - * {@link #setAddressRequestAttempts}. - * - * @param mcastAddr Multicast address where to send request. - * @param sockItf Optional interface multicast socket should be bound to. - * @return Collection of received addresses. - */ - private Collection<InetSocketAddress> requestAddresses(InetAddress mcastAddr, @Nullable InetAddress sockItf) { - Collection<InetSocketAddress> rmtAddrs = new HashSet<>(); - - try { - DatagramPacket reqPckt = new DatagramPacket(MSG_ADDR_REQ_DATA, MSG_ADDR_REQ_DATA.length, - mcastAddr, mcastPort); - - byte[] resData = new byte[AddressResponse.MAX_DATA_LENGTH]; - - DatagramPacket resPckt = new DatagramPacket(resData, resData.length); - - boolean sndError = false; - - for (int i = 0; i < addrReqAttempts; i++) { - MulticastSocket sock = null; - - try { - sock = new MulticastSocket(0); - - // Use 'false' to enable support for more than one node on the same machine. - sock.setLoopbackMode(false); - - if (sockItf != null) - sock.setInterface(sockItf); - - sock.setSoTimeout(resWaitTime); - - reqPckt.setData(MSG_ADDR_REQ_DATA); - - try { - sock.send(reqPckt); - } - catch (IOException e) { - if (!handleNetworkError(e)) - break; - - if (i < addrReqAttempts - 1) { - if (log.isDebugEnabled()) - log.debug("Failed to send multicast address request (will retry in 500 ms): " + e); - - U.sleep(500); - } - else { - if (log.isDebugEnabled()) - log.debug("Failed to send multicast address request: " + e); - } - - sndError = true; - - continue; - } - - long rcvEnd = U.currentTimeMillis() + resWaitTime; - - try { - while (U.currentTimeMillis() < rcvEnd) { // Try to receive multiple responses. - sock.receive(resPckt); - - byte[] data = resPckt.getData(); - - if (!U.bytesEqual(U.GG_HEADER, 0, data, 0, U.GG_HEADER.length)) { - U.error(log, "Failed to verify message header."); - - continue; - } - - AddressResponse addrRes; - - try { - addrRes = new AddressResponse(data); - } - catch (GridException e) { - LT.warn(log, e, "Failed to deserialize multicast response."); - - continue; - } - - rmtAddrs.addAll(addrRes.addresses()); - } - } - catch (SocketTimeoutException ignored) { - if (log.isDebugEnabled()) // DatagramSocket.receive timeout has expired. - log.debug("Address receive timeout."); - } - } - catch (IOException e) { - U.error(log, "Failed to request nodes addresses.", e); - } - finally { - U.close(sock); - } - - if (!rmtAddrs.isEmpty()) - break; - - if (i < addrReqAttempts - 1) // Wait some time before re-sending address request. - U.sleep(200); - } - - if (log.isDebugEnabled()) - log.debug("Received nodes addresses: " + rmtAddrs); - - if (rmtAddrs.isEmpty() && sndError) - U.quietAndWarn(log, "Failed to send multicast message (is multicast enabled on this node?)."); - - return rmtAddrs; - } - catch (GridInterruptedException ignored) { - U.warn(log, "Got interrupted while sending address request."); - - Thread.currentThread().interrupt(); - - return rmtAddrs; - } - } - - /** {@inheritDoc} */ - @Override public void close() { - for (AddressSender addrSnd : addrSnds) - U.interrupt(addrSnd); - - for (AddressSender addrSnd : addrSnds) - U.join(addrSnd, log); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(TcpDiscoveryMulticastIpFinder.class, this, "super", super.toString()); - } - - /** - * @param e Network error to handle. - * @return {@code True} if this error is recoverable and the operation can be retried. - */ - private boolean handleNetworkError(IOException e) { - if ("Network is unreachable".equals(e.getMessage()) && U.isMacOs()) { - U.warn(log, "Multicast does not work on Mac OS JVM loopback address (configure external IP address " + - "for 'localHost' configuration property)"); - - return false; - } - - return true; - } - - /** - * Response to multicast address request. - */ - private static class AddressResponse { - /** Maximum supported multicast message. */ - public static final int MAX_DATA_LENGTH = 64 * 1024; - - /** */ - private byte[] data; - - /** */ - private Collection<InetSocketAddress> addrs; - - /** - * @param addrs Addresses discovery SPI binds to. - * @throws GridException If marshalling failed. - */ - private AddressResponse(Collection<InetSocketAddress> addrs) throws GridException { - this.addrs = addrs; - - byte[] addrsData = marsh.marshal(addrs); - data = new byte[U.GG_HEADER.length + addrsData.length]; - - if (data.length > MAX_DATA_LENGTH) - throw new GridException("Too long data packet [size=" + data.length + ", max=" + MAX_DATA_LENGTH + "]"); - - System.arraycopy(U.GG_HEADER, 0, data, 0, U.GG_HEADER.length); - System.arraycopy(addrsData, 0, data, 4, addrsData.length); - } - - /** - * @param data Message data. - * @throws GridException If unmarshalling failed. - */ - private AddressResponse(byte[] data) throws GridException { - assert U.bytesEqual(U.GG_HEADER, 0, data, 0, U.GG_HEADER.length); - - this.data = data; - - addrs = marsh.unmarshal(Arrays.copyOfRange(data, U.GG_HEADER.length, data.length), null); - } - - /** - * @return Message data. - */ - byte[] data() { - return data; - } - - /** - * @return IP address discovery SPI binds to. - */ - public Collection<InetSocketAddress> addresses() { - return addrs; - } - } - - /** - * Thread sends multicast address request message and waits for reply. - */ - private class AddressReceiver extends IgniteSpiThread { - /** */ - private final InetAddress mcastAddr; - - /** */ - private final InetAddress sockAddr; - - /** */ - private Collection<InetSocketAddress> addrs; - - /** - * @param mcastAddr Multicast address where to send request. - * @param sockAddr Optional address multicast socket should be bound to. - */ - private AddressReceiver(InetAddress mcastAddr, InetAddress sockAddr) { - super(gridName, "tcp-disco-multicast-addr-rcvr", log); - this.mcastAddr = mcastAddr; - this.sockAddr = sockAddr; - } - - /** {@inheritDoc} */ - @Override protected void body() throws InterruptedException { - addrs = requestAddresses(mcastAddr, sockAddr); - } - - /** - * @return Received addresses. - */ - Collection<InetSocketAddress> addresses() { - return addrs; - } - } - - /** - * Thread listening for multicast address requests and sending response - * containing socket address this node's discovery SPI listens to. - */ - private class AddressSender extends IgniteSpiThread { - /** */ - @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") - private MulticastSocket sock; - - /** */ - private final InetAddress mcastGrp; - - /** */ - private final Collection<InetSocketAddress> addrs; - - /** */ - private final InetAddress sockItf; - - /** - * @param mcastGrp Multicast address. - * @param sockItf Optional interface multicast socket should be bound to. - * @param addrs Local node addresses. - * @throws IOException If fails to create multicast socket. - */ - private AddressSender(InetAddress mcastGrp, @Nullable InetAddress sockItf, Collection<InetSocketAddress> addrs) - throws IOException { - super(gridName, "tcp-disco-multicast-addr-sender", log); - this.mcastGrp = mcastGrp; - this.addrs = addrs; - this.sockItf = sockItf; - - sock = createSocket(); - } - - /** - * Creates multicast socket and joins multicast group. - * - * @throws IOException If fails to create socket or join multicast group. - * @return Multicast socket. - */ - private MulticastSocket createSocket() throws IOException { - MulticastSocket sock = new MulticastSocket(mcastPort); - - sock.setLoopbackMode(false); // Use 'false' to enable support for more than one node on the same machine. - - if (sockItf != null) - sock.setInterface(sockItf); - - if (sock.getLoopbackMode()) - U.warn(log, "Loopback mode is disabled which prevents nodes on the same machine from discovering " + - "each other."); - - sock.joinGroup(mcastGrp); - - return sock; - } - - /** {@inheritDoc} */ - @Override protected void body() throws InterruptedException { - AddressResponse res; - - try { - res = new AddressResponse(addrs); - } - catch (GridException e) { - U.error(log, "Failed to prepare multicast message.", e); - - return; - } - - byte[] reqData = new byte[MSG_ADDR_REQ_DATA.length]; - - DatagramPacket pckt = new DatagramPacket(reqData, reqData.length); - - while (!isInterrupted()) { - try { - MulticastSocket sock; - - synchronized (this) { - if (isInterrupted()) - return; - - sock = this.sock; - - if (sock == null) - sock = createSocket(); - } - - sock.receive(pckt); - - if (!U.bytesEqual(U.GG_HEADER, 0, reqData, 0, U.GG_HEADER.length)) { - U.error(log, "Failed to verify message header."); - - continue; - } - - try { - sock.send(new DatagramPacket(res.data(), res.data().length, pckt.getAddress(), pckt.getPort())); - } - catch (IOException e) { - if (e.getMessage().contains("Operation not permitted")) { - if (log.isDebugEnabled()) - log.debug("Got 'operation not permitted' error, ignoring: " + e); - } - else - throw e; - } - } - catch (IOException e) { - if (!isInterrupted()) { - U.error(log, "Failed to send/receive address message (will try to reconnect).", e); - - synchronized (this) { - U.close(sock); - - sock = null; - } - } - } - } - } - - /** {@inheritDoc} */ - @Override public void interrupt() { - super.interrupt(); - - synchronized (this) { - U.close(sock); - - sock = null; - } - } - - /** {@inheritDoc} */ - @Override protected void cleanup() { - synchronized (this) { - U.close(sock); - - sock = null; - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ef8f69b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/ipfinder/multicast/package.html ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/ipfinder/multicast/package.html b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/ipfinder/multicast/package.html deleted file mode 100644 index c3ced9e..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/ipfinder/multicast/package.html +++ /dev/null @@ -1,15 +0,0 @@ -<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> -<!-- - @html.file.header - _________ _____ __________________ _____ - __ ____/___________(_)______ /__ ____/______ ____(_)_______ - _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ ---> -<html> -<body> - <!-- Package description. --> - Contains multicast-based IP finder. -</body> -</html> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ef8f69b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/ipfinder/package.html ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/ipfinder/package.html b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/ipfinder/package.html deleted file mode 100644 index 0a05db1..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/ipfinder/package.html +++ /dev/null @@ -1,15 +0,0 @@ -<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> -<!-- - @html.file.header - _________ _____ __________________ _____ - __ ____/___________(_)______ /__ ____/______ ____(_)_______ - _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ ---> -<html> -<body> - <!-- Package description. --> - Contains IP finder interface and adapter. -</body> -</html> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ef8f69b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/ipfinder/sharedfs/TcpDiscoverySharedFsIpFinder.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/ipfinder/sharedfs/TcpDiscoverySharedFsIpFinder.java b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/ipfinder/sharedfs/TcpDiscoverySharedFsIpFinder.java deleted file mode 100644 index 7ee5631..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/ipfinder/sharedfs/TcpDiscoverySharedFsIpFinder.java +++ /dev/null @@ -1,258 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.spi.discovery.tcp.ipfinder.sharedfs; - -import org.apache.ignite.*; -import org.apache.ignite.resources.*; -import org.apache.ignite.spi.*; -import org.gridgain.grid.*; -import org.gridgain.grid.spi.discovery.tcp.ipfinder.*; -import org.gridgain.grid.util.typedef.*; -import org.gridgain.grid.util.typedef.internal.*; -import org.gridgain.grid.util.tostring.*; - -import java.io.*; -import java.net.*; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -/** - * Shared filesystem-based IP finder. - * <h1 class="header">Configuration</h1> - * <h2 class="header">Mandatory</h2> - * There are no mandatory configuration parameters. - * <h2 class="header">Optional</h2> - * <ul> - * <li>Path (see {@link #setPath(String)})</li> - * <li>Shared flag (see {@link #setShared(boolean)})</li> - * </ul> - * <p> - * If {@link #getPath()} is not provided, then {@link #DFLT_PATH} will be used and - * only local nodes will discover each other. To enable discovery over network - * you must provide a path to a shared directory explicitly. - * <p> - * The directory will contain empty files named like the following 192.168.1.136#1001. - * <p> - * Note that this finder is shared by default (see {@link org.gridgain.grid.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder#isShared()}. - */ -public class TcpDiscoverySharedFsIpFinder extends TcpDiscoveryIpFinderAdapter { - /** - * Default path for discovering of local nodes (testing only). Note that this path is relative to - * {@code GRIDGAIN_HOME/work} folder if {@code GRIDGAIN_HOME} system or environment variable specified, - * otherwise it is relative to {@code work} folder under system {@code java.io.tmpdir} folder. - * - * @see org.apache.ignite.configuration.IgniteConfiguration#getWorkDirectory() - */ - public static final String DFLT_PATH = "disco/tcp"; - - /** Delimiter to use between address and port tokens in file names. */ - public static final String DELIM = "#"; - - /** Grid logger. */ - @IgniteLoggerResource - private IgniteLogger log; - - /** File-system path. */ - private String path = DFLT_PATH; - - /** Folder to keep items in. */ - @GridToStringExclude - private File folder; - - /** Warning guard. */ - @GridToStringExclude - private final AtomicBoolean warnGuard = new AtomicBoolean(); - - /** Init guard. */ - @GridToStringExclude - private final AtomicBoolean initGuard = new AtomicBoolean(); - - /** Init latch. */ - @GridToStringExclude - private final CountDownLatch initLatch = new CountDownLatch(1); - - /** - * Constructor. - */ - public TcpDiscoverySharedFsIpFinder() { - setShared(true); - } - - /** - * Gets path. - * - * @return Shared path. - */ - public String getPath() { - return path; - } - - /** - * Sets path. - * - * @param path Shared path. - */ - @IgniteSpiConfiguration(optional = true) - public void setPath(String path) { - this.path = path; - } - - /** - * Initializes folder to work with. - * - * @return Folder. - * @throws org.apache.ignite.spi.IgniteSpiException If failed. - */ - private File initFolder() throws IgniteSpiException { - if (initGuard.compareAndSet(false, true)) { - if (path == null) - throw new IgniteSpiException("Shared file system path is null " + - "(it should be configured via setPath(..) configuration property)."); - - if (path.equals(DFLT_PATH) && warnGuard.compareAndSet(false, true)) - U.warn(log, "Default local computer-only share is used by IP finder."); - - try { - File tmp; - - if (new File(path).exists()) - tmp = new File(path); - else { - try { - tmp = U.resolveWorkDirectory(path, false); - } - catch (GridException e) { - throw new IgniteSpiException("Failed to resolve directory [path=" + path + - ", exception=" + e.getMessage() + ']'); - } - } - - if (!tmp.isDirectory()) - throw new IgniteSpiException("Failed to initialize shared file system path " + - "(path must point to folder): " + path); - - if (!tmp.canRead() || !tmp.canWrite()) - throw new IgniteSpiException("Failed to initialize shared file system path " + - "(path must be readable and writable): " + path); - - folder = tmp; - } - finally { - initLatch.countDown(); - } - } - else { - try { - U.await(initLatch); - } - catch (GridInterruptedException e) { - throw new IgniteSpiException("Thread has been interrupted.", e); - } - - if (folder == null) - throw new IgniteSpiException("Failed to initialize shared file system folder (check logs for errors)."); - } - - return folder; - } - - /** {@inheritDoc} */ - @Override public Collection<InetSocketAddress> getRegisteredAddresses() throws IgniteSpiException { - initFolder(); - - Collection<InetSocketAddress> addrs = new LinkedList<>(); - - for (String fileName : folder.list()) - if (!".svn".equals(fileName)) { - InetSocketAddress addr = null; - - StringTokenizer st = new StringTokenizer(fileName, DELIM); - - if (st.countTokens() == 2) { - String addrStr = st.nextToken(); - String portStr = st.nextToken(); - - try { - int port = Integer.parseInt(portStr); - - addr = new InetSocketAddress(addrStr, port); - } - catch (IllegalArgumentException e) { - U.error(log, "Failed to parse file entry: " + fileName, e); - } - } - - if (addr != null) - addrs.add(addr); - } - - return Collections.unmodifiableCollection(addrs); - } - - /** {@inheritDoc} */ - @Override public void registerAddresses(Collection<InetSocketAddress> addrs) throws IgniteSpiException { - assert !F.isEmpty(addrs); - - initFolder(); - - try { - for (InetSocketAddress addr : addrs) { - File file = new File(folder, name(addr)); - - file.createNewFile(); - } - } - catch (IOException e) { - throw new IgniteSpiException("Failed to create file.", e); - } - } - - /** {@inheritDoc} */ - @Override public void unregisterAddresses(Collection<InetSocketAddress> addrs) throws IgniteSpiException { - assert !F.isEmpty(addrs); - - initFolder(); - - try { - for (InetSocketAddress addr : addrs) { - File file = new File(folder, name(addr)); - - file.delete(); - } - } - catch (SecurityException e) { - throw new IgniteSpiException("Failed to delete file.", e); - } - } - - /** - * Creates file name for address. - * - * @param addr Node address. - * @return Name. - */ - private String name(InetSocketAddress addr) { - assert addr != null; - - SB sb = new SB(); - - sb.a(addr.getAddress().getHostAddress()) - .a(DELIM) - .a(addr.getPort()); - - return sb.toString(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(TcpDiscoverySharedFsIpFinder.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ef8f69b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/ipfinder/sharedfs/package.html ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/ipfinder/sharedfs/package.html b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/ipfinder/sharedfs/package.html deleted file mode 100644 index e1305a3..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/ipfinder/sharedfs/package.html +++ /dev/null @@ -1,15 +0,0 @@ -<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> -<!-- - @html.file.header - _________ _____ __________________ _____ - __ ____/___________(_)______ /__ ____/______ ____(_)_______ - _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ ---> -<html> -<body> - <!-- Package description. --> - Contains shared filesystem-based IP finder. -</body> -</html> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ef8f69b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/ipfinder/vm/TcpDiscoveryVmIpFinder.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/ipfinder/vm/TcpDiscoveryVmIpFinder.java b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/ipfinder/vm/TcpDiscoveryVmIpFinder.java deleted file mode 100644 index fdcee08..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/ipfinder/vm/TcpDiscoveryVmIpFinder.java +++ /dev/null @@ -1,255 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.spi.discovery.tcp.ipfinder.vm; - -import org.apache.ignite.*; -import org.apache.ignite.resources.*; -import org.apache.ignite.spi.*; -import org.gridgain.grid.*; -import org.gridgain.grid.spi.discovery.tcp.ipfinder.*; -import org.gridgain.grid.util.tostring.*; -import org.gridgain.grid.util.typedef.*; -import org.gridgain.grid.util.typedef.internal.*; - -import java.net.*; -import java.util.*; - -import static org.apache.ignite.IgniteSystemProperties.*; - -/** - * IP Finder which works only with pre-configured list of IP addresses specified - * via {@link #setAddresses(Collection)} method. By default, this IP finder is - * not {@code shared}, which means that all grid nodes have to be configured with the - * same list of IP addresses when this IP finder is used. - * <h1 class="header">Configuration</h1> - * <h2 class="header">Mandatory</h2> - * There are no mandatory configuration parameters. - * <h2 class="header">Optional</h2> - * <ul> - * <li>Addresses for initialization (see {@link #setAddresses(Collection)})</li> - * <li>Shared flag (see {@link #setShared(boolean)})</li> - * </ul> - */ -public class TcpDiscoveryVmIpFinder extends TcpDiscoveryIpFinderAdapter { - /** Grid logger. */ - @IgniteLoggerResource - private IgniteLogger log; - - /** Addresses. */ - @GridToStringInclude - private Collection<InetSocketAddress> addrs; - - /** - * Initialize from system property. - */ - { - String ips = IgniteSystemProperties.getString(GG_TCP_DISCOVERY_ADDRESSES); - - if (!F.isEmpty(ips)) { - Collection<InetSocketAddress> addrsList = new LinkedHashSet<>(); - - for (String s : ips.split(",")) { - if (!F.isEmpty(s)) { - s = s.trim(); - - if (!F.isEmpty(s)) { - try { - addrsList.addAll(address(s)); - } - catch (IgniteSpiException e) { - throw new GridRuntimeException(e); - } - } - } - } - - addrs = addrsList; - } - else - addrs = new LinkedHashSet<>(); - } - - /** - * Constructs new IP finder. - */ - public TcpDiscoveryVmIpFinder() { - // No-op. - } - - /** - * Constructs new IP finder. - * - * @param shared {@code true} if IP finder is shared. - * @see #setShared(boolean) - */ - public TcpDiscoveryVmIpFinder(boolean shared) { - setShared(shared); - } - - /** - * Parses provided values and initializes the internal collection of addresses. - * <p> - * Addresses may be represented as follows: - * <ul> - * <li>IP address (e.g. 127.0.0.1, 9.9.9.9, etc);</li> - * <li>IP address and port (e.g. 127.0.0.1:47500, 9.9.9.9:47501, etc);</li> - * <li>IP address and port range (e.g. 127.0.0.1:47500..47510, 9.9.9.9:47501..47504, etc);</li> - * <li>Hostname (e.g. host1.com, host2, etc);</li> - * <li>Hostname and port (e.g. host1.com:47500, host2:47502, etc).</li> - * <li>Hostname and port range (e.g. host1.com:47500..47510, host2:47502..47508, etc).</li> - * </ul> - * <p> - * If port is 0 or not provided then default port will be used (depends on - * discovery SPI configuration). - * <p> - * If port range is provided (e.g. host:port1..port2) the following should be considered: - * <ul> - * <li>{@code port1 < port2} should be {@code true};</li> - * <li>Both {@code port1} and {@code port2} should be greater than {@code 0}.</li> - * </ul> - * - * @param addrs Known nodes addresses. - * @throws org.apache.ignite.spi.IgniteSpiException If any error occurs. - */ - @IgniteSpiConfiguration(optional = true) - public synchronized void setAddresses(Collection<String> addrs) throws IgniteSpiException { - if (F.isEmpty(addrs)) - return; - - Collection<InetSocketAddress> newAddrs = new LinkedHashSet<>(); - - for (String ipStr : addrs) - newAddrs.addAll(address(ipStr)); - - this.addrs = newAddrs; - } - - /** - * Creates address from string. - * - * @param ipStr Address string. - * @return Socket addresses (may contain 1 or more addresses if provided string - * includes port range). - * @throws org.apache.ignite.spi.IgniteSpiException If failed. - */ - private static Collection<InetSocketAddress> address(String ipStr) throws IgniteSpiException { - ipStr = ipStr.trim(); - - String errMsg = "Failed to parse provided address: " + ipStr; - - int colonCnt = ipStr.length() - ipStr.replace(":", "").length(); - - if (colonCnt > 1) { - // IPv6 address (literal IPv6 addresses are enclosed in square brackets, for example - // https://[2001:db8:85a3:8d3:1319:8a2e:370:7348]:443). - if (ipStr.startsWith("[")) { - ipStr = ipStr.substring(1); - - if (ipStr.contains("]:")) - return addresses(ipStr, "\\]\\:", errMsg); - else if (ipStr.endsWith("]")) - ipStr = ipStr.substring(0, ipStr.length() - 1); - else - throw new IgniteSpiException(errMsg); - } - } - else { - // IPv4 address. - if (ipStr.endsWith(":")) - ipStr = ipStr.substring(0, ipStr.length() - 1); - else if (ipStr.indexOf(':') >= 0) - return addresses(ipStr, "\\:", errMsg); - } - - // Provided address does not contain port (will use default one). - return Collections.singleton(new InetSocketAddress(ipStr, 0)); - } - - /** - * Creates address from string with port information. - * - * @param ipStr Address string - * @param regexDelim Port regex delimiter. - * @param errMsg Error message. - * @return Socket addresses (may contain 1 or more addresses if provided string - * includes port range). - * @throws org.apache.ignite.spi.IgniteSpiException If failed. - */ - private static Collection<InetSocketAddress> addresses(String ipStr, String regexDelim, String errMsg) - throws IgniteSpiException { - String[] tokens = ipStr.split(regexDelim); - - if (tokens.length == 2) { - String addrStr = tokens[0]; - String portStr = tokens[1]; - - if (portStr.contains("..")) { - try { - int port1 = Integer.parseInt(portStr.substring(0, portStr.indexOf(".."))); - int port2 = Integer.parseInt(portStr.substring(portStr.indexOf("..") + 2, portStr.length())); - - if (port2 < port1 || port1 == port2 || port1 <= 0 || port2 <= 0) - throw new IgniteSpiException(errMsg); - - Collection<InetSocketAddress> res = new ArrayList<>(port2 - port1); - - // Upper bound included. - for (int i = port1; i <= port2; i++) - res.add(new InetSocketAddress(addrStr, i)); - - return res; - } - catch (IllegalArgumentException e) { - throw new IgniteSpiException(errMsg, e); - } - } - else { - try { - int port = Integer.parseInt(portStr); - - return Collections.singleton(new InetSocketAddress(addrStr, port)); - } - catch (IllegalArgumentException e) { - throw new IgniteSpiException(errMsg, e); - } - } - } - else - throw new IgniteSpiException(errMsg); - } - - /** {@inheritDoc} */ - @Override public synchronized Collection<InetSocketAddress> getRegisteredAddresses() { - return Collections.unmodifiableCollection(addrs); - } - - /** {@inheritDoc} */ - @Override public synchronized void registerAddresses(Collection<InetSocketAddress> addrs) { - assert !F.isEmpty(addrs); - - this.addrs = new LinkedHashSet<>(this.addrs); - - this.addrs.addAll(addrs); - } - - /** {@inheritDoc} */ - @Override public synchronized void unregisterAddresses(Collection<InetSocketAddress> addrs) { - assert !F.isEmpty(addrs); - - this.addrs = new LinkedHashSet<>(this.addrs); - - this.addrs.removeAll(addrs); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(TcpDiscoveryVmIpFinder.class, this, "super", super.toString()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ef8f69b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/ipfinder/vm/package.html ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/ipfinder/vm/package.html b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/ipfinder/vm/package.html deleted file mode 100644 index 7016083..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/ipfinder/vm/package.html +++ /dev/null @@ -1,15 +0,0 @@ -<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> -<!-- - @html.file.header - _________ _____ __________________ _____ - __ ____/___________(_)______ /__ ____/______ ____(_)_______ - _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ ---> -<html> -<body> - <!-- Package description. --> - Contains local JVM-based IP finder. -</body> -</html>