http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/edbfd9ad/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiConfiguration.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiConfiguration.java new file mode 100644 index 0000000..a88f0e0 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiConfiguration.java @@ -0,0 +1,569 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.communication.tcp; + +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.util.nio.*; +import org.apache.ignite.spi.*; +import org.apache.ignite.spi.communication.*; + +import static org.apache.ignite.IgniteSystemProperties.*; + +/** + * + */ +@IgniteSpiMultipleInstancesSupport(true) +@IgniteSpiConsistencyChecked(optional = false) +public class TcpCommunicationSpiConfiguration implements CommunicationSpiConfiguration<TcpCommunicationSpi> { + /** Default port which node sets listener to (value is <tt>47100</tt>). */ + public static final int DFLT_PORT = 47100; + + /** Default port which node sets listener for shared memory connections (value is <tt>48100</tt>). */ + public static final int DFLT_SHMEM_PORT = 48100; + + /** Default idle connection timeout (value is <tt>30000</tt>ms). */ + public static final long DFLT_IDLE_CONN_TIMEOUT = 30000; + + /** Default value for connection buffer flush frequency (value is <tt>100</tt> ms). */ + public static final long DFLT_CONN_BUF_FLUSH_FREQ = 100; + + /** Default value for connection buffer size (value is <tt>0</tt>). */ + public static final int DFLT_CONN_BUF_SIZE = 0; + + /** Default socket send and receive buffer size. */ + public static final int DFLT_SOCK_BUF_SIZE = 32 * 1024; + + /** Default connection timeout (value is <tt>1000</tt>ms). */ + public static final long DFLT_CONN_TIMEOUT = 1000; + + /** Default Maximum connection timeout (value is <tt>600,000</tt>ms). */ + public static final long DFLT_MAX_CONN_TIMEOUT = 10 * 60 * 1000; + + /** Default reconnect attempts count (value is <tt>10</tt>). */ + public static final int DFLT_RECONNECT_CNT = 10; + + /** Default message queue limit per connection (for incoming and outgoing . */ + public static final int DFLT_MSG_QUEUE_LIMIT = GridNioServer.DFLT_SEND_QUEUE_LIMIT; + + /** + * Default count of selectors for TCP server equals to + * {@code "Math.min(4, Runtime.getRuntime().availableProcessors())"}. + */ + public static final int DFLT_SELECTORS_CNT = Math.min(4, Runtime.getRuntime().availableProcessors()); + + /** + * Default local port range (value is <tt>100</tt>). + * See {@link #setLocalPortRange(int)} for details. + */ + public static final int DFLT_PORT_RANGE = 100; + + /** Default value for {@code TCP_NODELAY} socket option (value is <tt>true</tt>). */ + public static final boolean DFLT_TCP_NODELAY = true; + + /** Default received messages threshold for sending ack. */ + public static final int DFLT_ACK_SND_THRESHOLD = 16; + + /** Default socket write timeout. */ + public static final long DFLT_SOCK_WRITE_TIMEOUT = GridNioServer.DFLT_SES_WRITE_TIMEOUT; + + /** Local IP address. */ + private String locAddr; + + /** Local port which node uses. */ + private int locPort = DFLT_PORT; + + /** Local port range. */ + private int locPortRange = DFLT_PORT_RANGE; + + /** Local port which node uses to accept shared memory connections. */ + private int shmemPort = DFLT_SHMEM_PORT; + + /** Allocate direct buffer or heap buffer. */ + private boolean directBuf = true; + + /** Allocate direct buffer or heap buffer. */ + private boolean directSndBuf; + + /** Idle connection timeout. */ + private long idleConnTimeout = DFLT_IDLE_CONN_TIMEOUT; + + /** Connection buffer flush frequency. */ + private volatile long connBufFlushFreq = DFLT_CONN_BUF_FLUSH_FREQ; + + /** Connection buffer size. */ + @SuppressWarnings("RedundantFieldInitialization") + private int connBufSize = DFLT_CONN_BUF_SIZE; + + /** Connect timeout. */ + private long connTimeout = DFLT_CONN_TIMEOUT; + + /** Maximum connect timeout. */ + private long maxConnTimeout = DFLT_MAX_CONN_TIMEOUT; + + /** Reconnect attempts count. */ + @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"}) + private int reconCnt = DFLT_RECONNECT_CNT; + + /** Socket send buffer. */ + private int sockSndBuf = DFLT_SOCK_BUF_SIZE; + + /** Socket receive buffer. */ + private int sockRcvBuf = DFLT_SOCK_BUF_SIZE; + + /** Message queue limit. */ + private int msgQueueLimit = DFLT_MSG_QUEUE_LIMIT; + + /** Min buffered message count. */ + private int minBufferedMsgCnt = Integer.getInteger(IGNITE_MIN_BUFFERED_COMMUNICATION_MSG_CNT, 512); + + /** Buffer size ratio. */ + private double bufSizeRatio = IgniteSystemProperties.getDouble(IGNITE_COMMUNICATION_BUF_RESIZE_RATIO, 0.8); + + /** {@code TCP_NODELAY} option value for created sockets. */ + private boolean tcpNoDelay = DFLT_TCP_NODELAY; + + /** Number of received messages after which acknowledgment is sent. */ + private int ackSndThreshold = DFLT_ACK_SND_THRESHOLD; + + /** Maximum number of unacknowledged messages. */ + private int unackedMsgsBufSize; + + /** Socket write timeout. */ + private long sockWriteTimeout = DFLT_SOCK_WRITE_TIMEOUT; + + /** Count of selectors to use in TCP server. */ + private int selectorsCnt = DFLT_SELECTORS_CNT; + + /** Address resolver. */ + private IgniteAddressResolver addrRslvr; + + /** {@inheritDoc} */ + @Override public Class<? extends TcpCommunicationSpi> spiClass() { + return TcpCommunicationSpi.class; + } + + /** + * Sets address resolver. + * + * @param addrRslvr Address resolver. + */ + @IgniteSpiConfigurationProperty(optional = true) + public void setAddressResolver(IgniteAddressResolver addrRslvr) { + // Injection should not override value already set by Spring or user. + if (this.addrRslvr == null) + this.addrRslvr = addrRslvr; + } + + /** + * Gets address resolver. + * + * @return Address resolver. + */ + public IgniteAddressResolver getAddressResolver() { + return addrRslvr; + } + + /** + * Sets local host address for socket binding. Note that one node could have + * additional addresses beside the loopback one. This configuration + * parameter is optional. + * + * @param locAddr IP address. Default value is any available local + * IP address. + */ + @IgniteSpiConfigurationProperty(optional = true) + public void setLocalAddress(String locAddr) { + // Injection should not override value already set by Spring or user. + if (this.locAddr == null) + this.locAddr = locAddr; + } + + public String getLocalAddress() { + return locAddr; + } + + /** + * Sets local port for socket binding. + * <p> + * If not provided, default value is {@link #DFLT_PORT}. + * + * @param locPort Port number. + */ + @IgniteSpiConfigurationProperty(optional = true) + public void setLocalPort(int locPort) { + this.locPort = locPort; + } + + public int getLocalPort() { + return locPort; + } + + /** + * Sets local port range for local host ports (value must greater than or equal to <tt>0</tt>). + * If provided local port (see {@link #setLocalPort(int)}} is occupied, + * implementation will try to increment the port number for as long as it is less than + * initial value plus this range. + * <p> + * If port range value is <tt>0</tt>, then implementation will try bind only to the port provided by + * {@link #setLocalPort(int)} method and fail if binding to this port did not succeed. + * <p> + * Local port range is very useful during development when more than one grid nodes need to run + * on the same physical machine. + * <p> + * If not provided, default value is {@link #DFLT_PORT_RANGE}. + * + * @param locPortRange New local port range. + */ + @IgniteSpiConfigurationProperty(optional = true) + public void setLocalPortRange(int locPortRange) { + this.locPortRange = locPortRange; + } + + public int getLocalPortRange() { + return locPortRange; + } + + /** + * Sets local port to accept shared memory connections. + * <p> + * If set to {@code -1} shared memory communication will be disabled. + * <p> + * If not provided, default value is {@link #DFLT_SHMEM_PORT}. + * + * @param shmemPort Port number. + */ + @IgniteSpiConfigurationProperty(optional = true) + public void setSharedMemoryPort(int shmemPort) { + this.shmemPort = shmemPort; + } + + public int getSharedMemoryPort() { + return shmemPort; + } + + /** + * Sets maximum idle connection timeout upon which a connection + * to client will be closed. + * <p> + * If not provided, default value is {@link #DFLT_IDLE_CONN_TIMEOUT}. + * + * @param idleConnTimeout Maximum idle connection time. + */ + @IgniteSpiConfigurationProperty(optional = true) + public void setIdleConnectionTimeout(long idleConnTimeout) { + this.idleConnTimeout = idleConnTimeout; + } + + public long getIdleConnectionTimeout() { + return idleConnTimeout; + } + + public long getSocketWriteTimeout() { + return sockWriteTimeout; + } + + /** + * Sets socket write timeout for TCP connection. If message can not be written to + * socket within this time then connection is closed and reconnect is attempted. + * <p> + * Default to {@link #DFLT_SOCK_WRITE_TIMEOUT}. + * + * @param sockWriteTimeout Socket write timeout for TCP connection. + */ + @IgniteSpiConfigurationProperty(optional = true) + public void setSocketWriteTimeout(long sockWriteTimeout) { + this.sockWriteTimeout = sockWriteTimeout; + } + + public int getAckSendThreshold() { + return ackSndThreshold; + } + + /** + * Sets number of received messages per connection to node after which acknowledgment message is sent. + * <p> + * Default to {@link #DFLT_ACK_SND_THRESHOLD}. + * + * @param ackSndThreshold Number of received messages after which acknowledgment is sent. + */ + @IgniteSpiConfigurationProperty(optional = true) + public void setAckSendThreshold(int ackSndThreshold) { + this.ackSndThreshold = ackSndThreshold; + } + + public int getUnacknowledgedMessagesBufferSize() { + return unackedMsgsBufSize; + } + + /** + * Sets maximum number of stored unacknowledged messages per connection to node. + * If number of unacknowledged messages exceeds this number then connection to node is + * closed and reconnect is attempted. + * + * @param unackedMsgsBufSize Maximum number of unacknowledged messages. + */ + @IgniteSpiConfigurationProperty(optional = true) + public void setUnacknowledgedMessagesBufferSize(int unackedMsgsBufSize) { + this.unackedMsgsBufSize = unackedMsgsBufSize; + } + + /** + * Sets connection buffer size. If set to {@code 0} connection buffer is disabled. + * <p> + * If not provided, default value is {@link #DFLT_CONN_BUF_SIZE}. + * + * @param connBufSize Connection buffer size. + * @see #setConnectionBufferFlushFrequency(long) + */ + @IgniteSpiConfigurationProperty(optional = true) + public void setConnectionBufferSize(int connBufSize) { + this.connBufSize = connBufSize; + } + + public int getConnectionBufferSize() { + return connBufSize; + } + + @IgniteSpiConfigurationProperty(optional = true) + public void setConnectionBufferFlushFrequency(long connBufFlushFreq) { + this.connBufFlushFreq = connBufFlushFreq; + } + + public long getConnectionBufferFlushFrequency() { + return connBufFlushFreq; + } + + /** + * Sets connect timeout used when establishing connection + * with remote nodes. + * <p> + * {@code 0} is interpreted as infinite timeout. + * <p> + * If not provided, default value is {@link #DFLT_CONN_TIMEOUT}. + * + * @param connTimeout Connect timeout. + */ + @IgniteSpiConfigurationProperty(optional = true) + public void setConnectTimeout(long connTimeout) { + this.connTimeout = connTimeout; + } + + public long getConnectTimeout() { + return connTimeout; + } + + /** + * Sets maximum connect timeout. If handshake is not established within connect timeout, + * then SPI tries to repeat handshake procedure with increased connect timeout. + * Connect timeout can grow till maximum timeout value, + * if maximum timeout value is reached then the handshake is considered as failed. + * <p> + * {@code 0} is interpreted as infinite timeout. + * <p> + * If not provided, default value is {@link #DFLT_MAX_CONN_TIMEOUT}. + * + * @param maxConnTimeout Maximum connect timeout. + */ + @IgniteSpiConfigurationProperty(optional = true) + public void setMaxConnectTimeout(long maxConnTimeout) { + this.maxConnTimeout = maxConnTimeout; + } + + public long getMaxConnectTimeout() { + return maxConnTimeout; + } + + /** + * Sets maximum number of reconnect attempts used when establishing connection + * with remote nodes. + * <p> + * If not provided, default value is {@link #DFLT_RECONNECT_CNT}. + * + * @param reconCnt Maximum number of reconnection attempts. + */ + @IgniteSpiConfigurationProperty(optional = true) + public void setReconnectCount(int reconCnt) { + this.reconCnt = reconCnt; + } + + public int getReconnectCount() { + return reconCnt; + } + + /** + * Sets flag to allocate direct or heap buffer in SPI. + * If value is {@code true}, then SPI will use {@link java.nio.ByteBuffer#allocateDirect(int)} call. + * Otherwise, SPI will use {@link java.nio.ByteBuffer#allocate(int)} call. + * <p> + * If not provided, default value is {@code true}. + * + * @param directBuf Flag indicates to allocate direct or heap buffer in SPI. + */ + @IgniteSpiConfigurationProperty(optional = true) + public void setDirectBuffer(boolean directBuf) { + this.directBuf = directBuf; + } + + public boolean isDirectBuffer() { + return directBuf; + } + + public boolean isDirectSendBuffer() { + return directSndBuf; + } + + /** + * Sets whether to use direct buffer for sending. + * <p> + * If not provided default is {@code false}. + * + * @param directSndBuf {@code True} to use direct buffers for send. + */ + @IgniteSpiConfigurationProperty(optional = true) + public void setDirectSendBuffer(boolean directSndBuf) { + this.directSndBuf = directSndBuf; + } + + /** + * Sets the count of selectors te be used in TCP server. + * <p/> + * If not provided, default value is {@link #DFLT_SELECTORS_CNT}. + * + * @param selectorsCnt Selectors count. + */ + @IgniteSpiConfigurationProperty(optional = true) + public void setSelectorsCount(int selectorsCnt) { + this.selectorsCnt = selectorsCnt; + } + + public int getSelectorsCount() { + return selectorsCnt; + } + + /** + * Sets value for {@code TCP_NODELAY} socket option. Each + * socket will be opened using provided value. + * <p> + * Setting this option to {@code true} disables Nagle's algorithm + * for socket decreasing latency and delivery time for small messages. + * <p> + * For systems that work under heavy network load it is advisable to + * set this value to {@code false}. + * <p> + * If not provided, default value is {@link #DFLT_TCP_NODELAY}. + * + * @param tcpNoDelay {@code True} to disable TCP delay. + */ + @IgniteSpiConfigurationProperty(optional = true) + public void setTcpNoDelay(boolean tcpNoDelay) { + this.tcpNoDelay = tcpNoDelay; + } + + public boolean isTcpNoDelay() { + return tcpNoDelay; + } + + /** + * Sets receive buffer size for sockets created or accepted by this SPI. + * <p> + * If not provided, default is {@link #DFLT_SOCK_BUF_SIZE}. + * + * @param sockRcvBuf Socket receive buffer size. + */ + @IgniteSpiConfigurationProperty(optional = true) + public void setSocketReceiveBuffer(int sockRcvBuf) { + this.sockRcvBuf = sockRcvBuf; + } + + public int getSocketReceiveBuffer() { + return sockRcvBuf; + } + + /** + * Sets send buffer size for sockets created or accepted by this SPI. + * <p> + * If not provided, default is {@link #DFLT_SOCK_BUF_SIZE}. + * + * @param sockSndBuf Socket send buffer size. + */ + @IgniteSpiConfigurationProperty(optional = true) + public void setSocketSendBuffer(int sockSndBuf) { + this.sockSndBuf = sockSndBuf; + } + + public int getSocketSendBuffer() { + return sockSndBuf; + } + + /** + * Sets message queue limit for incoming and outgoing messages. + * <p> + * When set to positive number send queue is limited to the configured value. + * {@code 0} disables the size limitations. + * <p> + * If not provided, default is {@link #DFLT_MSG_QUEUE_LIMIT}. + * + * @param msgQueueLimit Send queue size limit. + */ + @IgniteSpiConfigurationProperty(optional = true) + public void setMessageQueueLimit(int msgQueueLimit) { + this.msgQueueLimit = msgQueueLimit; + } + + public int getMessageQueueLimit() { + return msgQueueLimit; + } + + /** + * Sets the minimum number of messages for this SPI, that are buffered + * prior to sending. + * <p> + * Defaults to either {@code 512} or {@link org.apache.ignite.IgniteSystemProperties#IGNITE_MIN_BUFFERED_COMMUNICATION_MSG_CNT} + * system property (if specified). + * + * @param minBufferedMsgCnt Minimum buffered message count. + */ + @IgniteSpiConfigurationProperty(optional = true) + public void setMinimumBufferedMessageCount(int minBufferedMsgCnt) { + this.minBufferedMsgCnt = minBufferedMsgCnt; + } + + /** {@inheritDoc} */ + public int getMinimumBufferedMessageCount() { + return minBufferedMsgCnt; + } + + /** + * Sets the buffer size ratio for this SPI. As messages are sent, + * the buffer size is adjusted using this ratio. + * <p> + * Defaults to either {@code 0.8} or {@link org.apache.ignite.IgniteSystemProperties#IGNITE_COMMUNICATION_BUF_RESIZE_RATIO} + * system property (if specified). + * + * @param bufSizeRatio Buffer size ratio. + */ + @IgniteSpiConfigurationProperty(optional = true) + public void setBufferSizeRatio(double bufSizeRatio) { + this.bufSizeRatio = bufSizeRatio; + } + + public double getBufferSizeRatio() { + return bufSizeRatio; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/edbfd9ad/modules/core/src/main/java/org/apache/ignite/spi/deployment/DeploymentSpiConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/deployment/DeploymentSpiConfiguration.java b/modules/core/src/main/java/org/apache/ignite/spi/deployment/DeploymentSpiConfiguration.java new file mode 100644 index 0000000..cca6b45 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/deployment/DeploymentSpiConfiguration.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.deployment; + +import org.apache.ignite.spi.*; + +/** + * + */ +public interface DeploymentSpiConfiguration<T extends DeploymentSpi> extends IgniteSpiConfiguration<T> { +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/edbfd9ad/modules/core/src/main/java/org/apache/ignite/spi/deployment/local/LocalDeploymentSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/deployment/local/LocalDeploymentSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/deployment/local/LocalDeploymentSpi.java index 70e4a8a..0bdca06 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/deployment/local/LocalDeploymentSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/deployment/local/LocalDeploymentSpi.java @@ -67,7 +67,16 @@ public class LocalDeploymentSpi extends IgniteSpiAdapter implements DeploymentSp /** Deployment SPI listener. */ private volatile DeploymentListener lsnr; - /** {@inheritDoc} */ + /** + * + */ + public LocalDeploymentSpi() { + super(new LocalDeploymentSpiConfiguration()); + } + + /** + * + */ public LocalDeploymentSpi(LocalDeploymentSpiConfiguration spiCfg) { super(spiCfg); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/edbfd9ad/modules/core/src/main/java/org/apache/ignite/spi/deployment/local/LocalDeploymentSpiConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/deployment/local/LocalDeploymentSpiConfiguration.java b/modules/core/src/main/java/org/apache/ignite/spi/deployment/local/LocalDeploymentSpiConfiguration.java index eb24db2..2457aff 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/deployment/local/LocalDeploymentSpiConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/deployment/local/LocalDeploymentSpiConfiguration.java @@ -26,7 +26,7 @@ import org.apache.ignite.spi.deployment.*; @IgniteSpiMultipleInstancesSupport(true) @IgniteSpiConsistencyChecked(optional = false) @IgnoreIfPeerClassLoadingDisabled -public class LocalDeploymentSpiConfiguration implements IgniteSpiConfiguration<LocalDeploymentSpi> { +public class LocalDeploymentSpiConfiguration implements DeploymentSpiConfiguration<LocalDeploymentSpi> { /** {@inheritDoc} */ @Override public Class<LocalDeploymentSpi> spiClass() { return LocalDeploymentSpi.class; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/edbfd9ad/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiConfiguration.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiConfiguration.java new file mode 100644 index 0000000..79fbc02 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiConfiguration.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery; + +import org.apache.ignite.spi.*; + +/** + * + */ +public interface DiscoverySpiConfiguration<T extends DiscoverySpi> extends IgniteSpiConfiguration<T> { +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/edbfd9ad/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java index 32c8c12..8f69bf4 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java @@ -94,6 +94,12 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp /** Disconnect check interval. */ private long disconnectCheckInt = DFLT_DISCONNECT_CHECK_INT; + public TcpClientDiscoverySpi(TcpClientDiscoverySpiConfiguration spiCfg) { + super(spiCfg); + + setDisconnectCheckInterval(spiCfg.getDisconnectCheckInterval()); + } + /** {@inheritDoc} */ @Override public long getDisconnectCheckInterval() { return disconnectCheckInt; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/edbfd9ad/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiConfiguration.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiConfiguration.java new file mode 100644 index 0000000..ddcb40c --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiConfiguration.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteInterruptedException; +import org.apache.ignite.cluster.ClusterMetrics; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.events.IgniteDiscoveryEvent; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.P1; +import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.internal.util.typedef.internal.LT; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiTuple; +import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.spi.IgniteSpiConfigurationProperty; +import org.apache.ignite.spi.IgniteSpiException; +import org.apache.ignite.spi.IgniteSpiMultipleInstancesSupport; +import org.apache.ignite.spi.IgniteSpiThread; +import org.apache.ignite.spi.discovery.DiscoverySpiHistorySupport; +import org.apache.ignite.spi.discovery.DiscoverySpiListener; +import org.apache.ignite.spi.discovery.DiscoverySpiNodeAuthenticator; +import org.apache.ignite.spi.discovery.DiscoverySpiOrderSupport; +import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode; +import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder; +import org.apache.ignite.spi.discovery.tcp.messages.*; +import org.jdk8.backport.ConcurrentHashMap8; +import org.jetbrains.annotations.Nullable; + +import java.io.BufferedInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.util.*; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.apache.ignite.events.IgniteEventType.*; +import static org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHeartbeatMessage.MetricsSet; + +/** + * Client discovery SPI implementation that uses TCP/IP for node discovery. + * <p> + * This discovery SPI requires at least on server node configured with + * {@link org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi}. It will try to connect to random IP taken from + * {@link org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder} which should point to one of these server + * nodes and will maintain connection only with this node (will not enter the ring). + * If this connection is broken, it will try to reconnect using addresses from + * the same IP finder. + */ +@SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext") +@IgniteSpiMultipleInstancesSupport(true) +@DiscoverySpiOrderSupport(true) +@DiscoverySpiHistorySupport(true) +public class TcpClientDiscoverySpiConfiguration extends TcpDiscoverySpiAdapterConfiguration<TcpClientDiscoverySpi> { + /** Default disconnect check interval. */ + public static final long DFLT_DISCONNECT_CHECK_INT = 2000; + + /** Disconnect check interval. */ + private long disconnectCheckInt = DFLT_DISCONNECT_CHECK_INT; + + public long getDisconnectCheckInterval() { + return disconnectCheckInt; + } + + /** {@inheritDoc} */ + @Override public Class<TcpClientDiscoverySpi> spiClass() { + return TcpClientDiscoverySpi.class; + } + + /** + * Sets disconnect check interval. + * + * @param disconnectCheckInt Disconnect check interval. + */ + @IgniteSpiConfigurationProperty(optional = true) + public void setDisconnectCheckInterval(long disconnectCheckInt) { + this.disconnectCheckInt = disconnectCheckInt; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/edbfd9ad/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index 448a01c..6908639 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -281,6 +281,26 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") private ConcurrentLinkedDeque<String> debugLog; + public TcpDiscoverySpi() { + this(new TcpDiscoverySpiConfiguration()); + } + + public TcpDiscoverySpi(TcpDiscoverySpiConfiguration spiCfg) { + super(spiCfg); + + setAddressResolver(spiCfg.getAddressResolver()); + setIpFinderCleanFrequency(spiCfg.getIpFinderCleanFrequency()); + setJoinTimeout(spiCfg.getJoinTimeout()); + setLocalPort(spiCfg.getLocalPort()); + setLocalPortRange(spiCfg.getLocalPortRange()); + setMaxAckTimeout(spiCfg.getMaxAckTimeout()); + setMaxMissedClientHeartbeats(spiCfg.getMaxMissedClientHeartbeats()); + setMaxMissedHeartbeats(spiCfg.getMaxMissedHeartbeats()); + setReconnectCount(spiCfg.getReconnectCount()); + setStatisticsPrintFrequency(spiCfg.getStatisticsPrintFrequency()); + setDebugMode(spiCfg.isDebugMode()); + } + /** {@inheritDoc} */ @IgniteInstanceResource @Override public void injectResources(Ignite ignite) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/edbfd9ad/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java index b0f01c5..500bf50 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java @@ -144,6 +144,39 @@ abstract class TcpDiscoverySpiAdapter extends IgniteSpiAdapter implements Discov protected IgniteLogger log; /** + * Create SPI from configuration object. + * + * @param spiCfg SPI configuration. + */ + protected TcpDiscoverySpiAdapter(TcpDiscoverySpiAdapterConfiguration spiCfg) { + super(spiCfg); + + setAckTimeout(spiCfg.getAckTimeout()); + setHeartbeatFrequency(spiCfg.getHbFreq()); + setLocalAddress(spiCfg.getLocalAddress()); + setNetworkTimeout(spiCfg.getNetTimeout()); + setSocketTimeout(spiCfg.getSockTimeout()); + setThreadPriority(spiCfg.getThreadPri()); + setTopHistorySize(spiCfg.getTopHistSize()); + + TcpDiscoveryIpFinderConfiguration ipFinderCfg = spiCfg.getIpFinderConfiguration(); + + TcpDiscoveryIpFinder tcpIpFinder = null; + + if (ipFinderCfg != null) { + try { + tcpIpFinder = (TcpDiscoveryIpFinder)(ipFinderCfg.getTcpDiscoveryIpFinderClass() + .getConstructor(ipFinderCfg.getClass()).newInstance(ipFinderCfg)); + } + catch (Exception e) { + throw new IgniteException("Failed. Can't create TcpDiscoveryIpFinder.", e); + } + } + + setIpFinder(tcpIpFinder); + } + + /** * Inject resources * * @param ignite Ignite. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/edbfd9ad/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapterConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapterConfiguration.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapterConfiguration.java new file mode 100644 index 0000000..67adfc2 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapterConfiguration.java @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.util.GridConcurrentSkipListSet; +import org.apache.ignite.internal.util.io.GridByteArrayOutputStream; +import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.internal.util.typedef.internal.LT; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteProductVersion; +import org.apache.ignite.marshaller.IgniteMarshaller; +import org.apache.ignite.marshaller.jdk.IgniteJdkMarshaller; +import org.apache.ignite.resources.IgniteInstanceResource; +import org.apache.ignite.resources.IgniteLoggerResource; +import org.apache.ignite.spi.*; +import org.apache.ignite.spi.discovery.*; +import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode; +import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoverySpiState; +import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryStatistics; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinderConfiguration; +import org.apache.ignite.spi.discovery.tcp.messages.*; +import org.jetbrains.annotations.Nullable; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.*; +import java.util.*; +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +import static org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoverySpiState.DISCONNECTED; + +/** + * Base class for TCP discovery SPIs. + */ +abstract class TcpDiscoverySpiAdapterConfiguration<T extends TcpDiscoverySpiAdapter> + implements DiscoverySpiConfiguration<T> { + /** Default socket operations timeout in milliseconds (value is <tt>2,000ms</tt>). */ + public static final long DFLT_SOCK_TIMEOUT = 2000; + + /** Default timeout for receiving message acknowledgement in milliseconds (value is <tt>5,000ms</tt>). */ + public static final long DFLT_ACK_TIMEOUT = 5000; + + /** Default network timeout in milliseconds (value is <tt>5,000ms</tt>). */ + public static final long DFLT_NETWORK_TIMEOUT = 5000; + + /** Default value for thread priority (value is <tt>10</tt>). */ + public static final int DFLT_THREAD_PRI = 10; + + /** Default heartbeat messages issuing frequency (value is <tt>2,000ms</tt>). */ + public static final long DFLT_HEARTBEAT_FREQ = 2000; + + /** Default size of topology snapshots history. */ + public static final int DFLT_TOP_HISTORY_SIZE = 1000; + + /** Local address. */ + protected String locAddr; + + /** IP finder. */ + protected TcpDiscoveryIpFinderConfiguration ipFinderCfg; + + /** Socket operations timeout. */ + protected long sockTimeout = DFLT_SOCK_TIMEOUT; + + /** Message acknowledgement timeout. */ + protected long ackTimeout = DFLT_ACK_TIMEOUT; + + /** Network timeout. */ + protected long netTimeout = DFLT_NETWORK_TIMEOUT; + + /** Thread priority for all threads started by SPI. */ + protected int threadPri = DFLT_THREAD_PRI; + + /** Heartbeat messages issuing frequency. */ + protected long hbFreq = DFLT_HEARTBEAT_FREQ; + + /** Size of topology snapshots history. */ + protected int topHistSize = DFLT_TOP_HISTORY_SIZE; + + /** + * Sets local host IP address that discovery SPI uses. + * <p> + * If not provided, by default a first found non-loopback address + * will be used. If there is no non-loopback address available, + * then {@link java.net.InetAddress#getLocalHost()} will be used. + * + * @param locAddr IP address. + */ + @IgniteSpiConfigurationProperty(optional = true) + public void setLocalAddress(String locAddr) { + this.locAddr = locAddr; + } + + /** + * Gets local address that was set to SPI with {@link #setLocalAddress(String)} method. + * + * @return local address. + */ + public String getLocalAddress() { + return locAddr; + } + + /** + * Gets IP finder for IP addresses sharing and storing. + * + * @return IP finder for IP addresses sharing and storing. + */ + public TcpDiscoveryIpFinderConfiguration getIpFinderConfiguration() { + return ipFinderCfg; + } + + /** + * Sets IP finder for IP addresses sharing and storing. + * <p> + * If not provided {@link org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder} will be used by default. + * + * @param ipFinderCfg IP finder. + */ + @IgniteSpiConfigurationProperty(optional = true) + public void setIpFinder(TcpDiscoveryIpFinderConfiguration ipFinderCfg) { + this.ipFinderCfg = ipFinderCfg; + } + + /** + * Sets socket operations timeout. This timeout is used to limit connection time and + * write-to-socket time. + * <p> + * Note that when running GridGain on Amazon EC2, socket timeout must be set to a value + * significantly greater than the default (e.g. to {@code 30000}). + * <p> + * If not specified, default is {@link #DFLT_SOCK_TIMEOUT}. + * + * @param sockTimeout Socket connection timeout. + */ + @IgniteSpiConfigurationProperty(optional = true) + public void setSocketTimeout(long sockTimeout) { + this.sockTimeout = sockTimeout; + } + + /** + * Sets timeout for receiving acknowledgement for sent message. + * <p> + * If acknowledgement is not received within this timeout, sending is considered as failed + * and SPI tries to repeat message sending. + * <p> + * If not specified, default is {@link #DFLT_ACK_TIMEOUT}. + * + * @param ackTimeout Acknowledgement timeout. + */ + @IgniteSpiConfigurationProperty(optional = true) + public void setAckTimeout(long ackTimeout) { + this.ackTimeout = ackTimeout; + } + + /** + * Sets maximum network timeout to use for network operations. + * <p> + * If not specified, default is {@link #DFLT_NETWORK_TIMEOUT}. + * + * @param netTimeout Network timeout. + */ + @IgniteSpiConfigurationProperty(optional = true) + public void setNetworkTimeout(long netTimeout) { + this.netTimeout = netTimeout; + } + + /** + * Sets thread priority. All threads within SPI will be started with it. + * <p> + * If not provided, default value is {@link #DFLT_THREAD_PRI} + * + * @param threadPri Thread priority. + */ + @IgniteSpiConfigurationProperty(optional = true) + public void setThreadPriority(int threadPri) { + this.threadPri = threadPri; + } + + /** + * Sets delay between issuing of heartbeat messages. SPI sends heartbeat messages + * in configurable time interval to other nodes to notify them about its state. + * <p> + * If not provided, default value is {@link #DFLT_HEARTBEAT_FREQ}. + * + * @param hbFreq Heartbeat frequency in milliseconds. + */ + @IgniteSpiConfigurationProperty(optional = true) + public void setHeartbeatFrequency(long hbFreq) { + this.hbFreq = hbFreq; + } + + /** + * Sets size of topology snapshots history. Specified size should be greater than or equal to default size + * {@link #DFLT_TOP_HISTORY_SIZE}. + * + * @param topHistSize Size of topology snapshots history. + */ + @IgniteSpiConfigurationProperty(optional = true) + public void setTopHistorySize(int topHistSize) { + this.topHistSize = topHistSize; + } + + public long getSockTimeout() { + return sockTimeout; + } + + public long getAckTimeout() { + return ackTimeout; + } + + public long getNetTimeout() { + return netTimeout; + } + + public int getThreadPri() { + return threadPri; + } + + public long getHbFreq() { + return hbFreq; + } + + public int getTopHistSize() { + return topHistSize; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/edbfd9ad/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiConfiguration.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiConfiguration.java new file mode 100644 index 0000000..aadd2bd --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiConfiguration.java @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp; + +import org.apache.ignite.configuration.*; +import org.apache.ignite.spi.*; +import org.apache.ignite.spi.discovery.*; + +/** + * + */ +@SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext") +@IgniteSpiMultipleInstancesSupport(true) +@DiscoverySpiOrderSupport(true) +@DiscoverySpiHistorySupport(true) +public class TcpDiscoverySpiConfiguration extends TcpDiscoverySpiAdapterConfiguration<TcpDiscoverySpi> { + /** Default local port range (value is <tt>100</tt>). */ + public static final int DFLT_PORT_RANGE = 100; + + /** Default timeout for joining topology (value is <tt>0</tt>). */ + public static final long DFLT_JOIN_TIMEOUT = 0; + + /** Default reconnect attempts count (value is <tt>10</tt>). */ + public static final int DFLT_RECONNECT_CNT = 10; + + /** Default max heartbeats count node can miss without initiating status check (value is <tt>1</tt>). */ + public static final int DFLT_MAX_MISSED_HEARTBEATS = 1; + + /** Default max heartbeats count node can miss without failing client node (value is <tt>5</tt>). */ + public static final int DFLT_MAX_MISSED_CLIENT_HEARTBEATS = 5; + + /** Default IP finder clean frequency in milliseconds (value is <tt>60,000ms</tt>). */ + public static final long DFLT_IP_FINDER_CLEAN_FREQ = 60 * 1000; + + /** Default statistics print frequency in milliseconds (value is <tt>0ms</tt>). */ + public static final long DFLT_STATS_PRINT_FREQ = 0; + + /** Maximum ack timeout value for receiving message acknowledgement in milliseconds (value is <tt>600,000ms</tt>). */ + public static final long DFLT_MAX_ACK_TIMEOUT = 10 * 60 * 1000; + + /** Address resolver. */ + private IgniteAddressResolver addrRslvr; + + /** Local port which node uses. */ + private int locPort = TcpDiscoverySpi.DFLT_PORT; + + /** Local port range. */ + private int locPortRange = DFLT_PORT_RANGE; + + /** Statistics print frequency. */ + @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized", "RedundantFieldInitialization"}) + private long statsPrintFreq = DFLT_STATS_PRINT_FREQ; + + /** Maximum message acknowledgement timeout. */ + private long maxAckTimeout = DFLT_MAX_ACK_TIMEOUT; + + /** Join timeout. */ + @SuppressWarnings("RedundantFieldInitialization") + private long joinTimeout = DFLT_JOIN_TIMEOUT; + + /** Max heartbeats count node can miss without initiating status check. */ + private int maxMissedHbs = DFLT_MAX_MISSED_HEARTBEATS; + + /** Max heartbeats count node can miss without failing client node. */ + private int maxMissedClientHbs = DFLT_MAX_MISSED_CLIENT_HEARTBEATS; + + /** IP finder clean frequency. */ + @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"}) + private long ipFinderCleanFreq = DFLT_IP_FINDER_CLEAN_FREQ; + + /** Reconnect attempts count. */ + @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"}) + private int reconCnt = DFLT_RECONNECT_CNT; + + /** Debug mode. */ + private boolean debugMode; + + /** {@inheritDoc} */ + @Override public Class<? extends TcpDiscoverySpi> spiClass() { + return TcpDiscoverySpi.class; + } + + /** + * Sets address resolver. + * + * @param addrRslvr Address resolver. + */ + @IgniteSpiConfigurationProperty(optional = true) + public void setAddressResolver(IgniteAddressResolver addrRslvr) { + // Injection should not override value already set by Spring or user. + if (this.addrRslvr == null) + this.addrRslvr = addrRslvr; + } + + /** + * Gets address resolver. + * + * @return Address resolver. + */ + public IgniteAddressResolver getAddressResolver() { + return addrRslvr; + } + + public int getReconnectCount() { + return reconCnt; + } + + /** + * Number of times node tries to (re)establish connection to another node. + * <p> + * Note that SPI implementation will increase {@link #ackTimeout} by factor 2 + * on every retry. + * <p> + * If not specified, default is {@link #DFLT_RECONNECT_CNT}. + * + * @param reconCnt Number of retries during message sending. + * @see #setAckTimeout(long) + */ + @IgniteSpiConfigurationProperty(optional = true) + public void setReconnectCount(int reconCnt) { + this.reconCnt = reconCnt; + } + + public long getMaxAckTimeout() { + return maxAckTimeout; + } + + /** + * Sets maximum timeout for receiving acknowledgement for sent message. + * <p> + * If acknowledgement is not received within this timeout, sending is considered as failed + * and SPI tries to repeat message sending. Every time SPI retries messing sending, ack + * timeout will be increased. If no acknowledgement is received and {@code maxAckTimeout} + * is reached, then the process of message sending is considered as failed. + * <p> + * If not specified, default is {@link #DFLT_MAX_ACK_TIMEOUT}. + * + * @param maxAckTimeout Maximum acknowledgement timeout. + */ + @IgniteSpiConfigurationProperty(optional = true) + public void setMaxAckTimeout(long maxAckTimeout) { + this.maxAckTimeout = maxAckTimeout; + } + + public long getJoinTimeout() { + return joinTimeout; + } + + /** + * Sets join timeout. + * <p> + * If non-shared IP finder is used and node fails to connect to + * any address from IP finder, node keeps trying to join within this + * timeout. If all addresses are still unresponsive, exception is thrown + * and node startup fails. + * <p> + * If not specified, default is {@link #DFLT_JOIN_TIMEOUT}. + * + * @param joinTimeout Join timeout ({@code 0} means wait forever). + * + * @see org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder#isShared() + */ + @IgniteSpiConfigurationProperty(optional = true) + public void setJoinTimeout(long joinTimeout) { + this.joinTimeout = joinTimeout; + } + + public int getLocalPort() { + return locPort; + } + + /** + * Sets local port to listen to. + * <p> + * If not specified, default is {@link TcpDiscoverySpi#DFLT_PORT}. + * + * @param locPort Local port to bind. + */ + @IgniteSpiConfigurationProperty(optional = true) + public void setLocalPort(int locPort) { + this.locPort = locPort; + } + + public int getLocalPortRange() { + return locPortRange; + } + + /** + * Range for local ports. Local node will try to bind on first available port + * starting from {@link #getLocalPort()} up until + * <tt>{@link #getLocalPort()} {@code + locPortRange}</tt>. + * <p> + * If not specified, default is {@link #DFLT_PORT_RANGE}. + * + * @param locPortRange Local port range to bind. + */ + @IgniteSpiConfigurationProperty(optional = true) + public void setLocalPortRange(int locPortRange) { + this.locPortRange = locPortRange; + } + + public int getMaxMissedHeartbeats() { + return maxMissedHbs; + } + + /** + * Sets max heartbeats count node can miss without initiating status check. + * <p> + * If not provided, default value is {@link #DFLT_MAX_MISSED_HEARTBEATS}. + * + * @param maxMissedHbs Max missed heartbeats. + */ + @IgniteSpiConfigurationProperty(optional = true) + public void setMaxMissedHeartbeats(int maxMissedHbs) { + this.maxMissedHbs = maxMissedHbs; + } + + public int getMaxMissedClientHeartbeats() { + return maxMissedClientHbs; + } + + /** + * Sets max heartbeats count node can miss without failing client node. + * <p> + * If not provided, default value is {@link #DFLT_MAX_MISSED_CLIENT_HEARTBEATS}. + * + * @param maxMissedClientHbs Max missed client heartbeats. + */ + @IgniteSpiConfigurationProperty(optional = true) + public void setMaxMissedClientHeartbeats(int maxMissedClientHbs) { + this.maxMissedClientHbs = maxMissedClientHbs; + } + + public long getStatisticsPrintFrequency() { + return statsPrintFreq; + } + + /** + * Sets statistics print frequency. + * <p> + * If not set default value is {@link #DFLT_STATS_PRINT_FREQ}. + * 0 indicates that no print is required. If value is greater than 0 and log is + * not quiet then statistics are printed out with INFO level. + * <p> + * This may be very helpful for tracing topology problems. + * + * @param statsPrintFreq Statistics print frequency in milliseconds. + */ + @IgniteSpiConfigurationProperty(optional = true) + public void setStatisticsPrintFrequency(long statsPrintFreq) { + this.statsPrintFreq = statsPrintFreq; + } + + public long getIpFinderCleanFrequency() { + return ipFinderCleanFreq; + } + + /** + * Sets IP finder clean frequency in milliseconds. + * <p> + * If not provided, default value is {@link #DFLT_IP_FINDER_CLEAN_FREQ} + * + * @param ipFinderCleanFreq IP finder clean frequency. + */ + @IgniteSpiConfigurationProperty(optional = true) + public void setIpFinderCleanFrequency(long ipFinderCleanFreq) { + this.ipFinderCleanFreq = ipFinderCleanFreq; + } + + + /** + * This method is intended for troubleshooting purposes only. + * + * @param debugMode {code True} to start SPI in debug mode. + */ + public void setDebugMode(boolean debugMode) { + this.debugMode = debugMode; + } + + public boolean isDebugMode() { + return debugMode; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/edbfd9ad/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinderAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinderAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinderAdapter.java index 1df3bdc..31a6f1c 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinderAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinderAdapter.java @@ -35,6 +35,12 @@ public abstract class TcpDiscoveryIpFinderAdapter implements TcpDiscoveryIpFinde @GridToStringExclude private volatile IgniteSpiContext spiCtx; + protected TcpDiscoveryIpFinderAdapter(TcpDiscoveryIpFinderAdapterConfiguration cfg) { + A.ensure(cfg.getClass().equals(this.getClass()), "configuration must have the same class that ip finder"); + + shared = cfg.isShared(); + } + /** {@inheritDoc} */ @Override public void onSpiContextInitialized(IgniteSpiContext spiCtx) throws IgniteSpiException { this.spiCtx = spiCtx; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/edbfd9ad/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinderAdapterConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinderAdapterConfiguration.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinderAdapterConfiguration.java new file mode 100644 index 0000000..0a1ddad --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinderAdapterConfiguration.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp.ipfinder; + +import org.apache.ignite.internal.util.tostring.GridToStringExclude; +import org.apache.ignite.internal.util.typedef.internal.A; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.spi.IgniteSpiConfigurationProperty; +import org.apache.ignite.spi.IgniteSpiContext; +import org.apache.ignite.spi.IgniteSpiException; + +import java.net.InetSocketAddress; +import java.util.Collection; + +/** + * IP finder interface implementation adapter. + */ +public abstract class TcpDiscoveryIpFinderAdapterConfiguration<T extends TcpDiscoveryIpFinderAdapter> + implements TcpDiscoveryIpFinderConfiguration<T> { + /** Shared flag. */ + private boolean shared; + + public boolean isShared() { + return shared; + } + + /** + * Sets shared flag. If {@code true} then it is expected that IP addresses registered + * with IP finder will be seen by IP finders on all other nodes. + * + * @param shared {@code true} if this IP finder is shared. + */ + @IgniteSpiConfigurationProperty(optional = true) + public void setShared(boolean shared) { + this.shared = shared; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/edbfd9ad/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinderConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinderConfiguration.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinderConfiguration.java new file mode 100644 index 0000000..566ae2b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinderConfiguration.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp.ipfinder; + +/** + * TODO: Add javadoc + */ +public interface TcpDiscoveryIpFinderConfiguration<T extends TcpDiscoveryIpFinder> { + /** + * + */ + public Class<? extends T> getTcpDiscoveryIpFinderClass(); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/edbfd9ad/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/jdbc/TcpDiscoveryJdbcIpFinder.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/jdbc/TcpDiscoveryJdbcIpFinder.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/jdbc/TcpDiscoveryJdbcIpFinder.java index 855dd3d..71ab6be 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/jdbc/TcpDiscoveryJdbcIpFinder.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/jdbc/TcpDiscoveryJdbcIpFinder.java @@ -87,11 +87,16 @@ public class TcpDiscoveryJdbcIpFinder extends TcpDiscoveryIpFinderAdapter { @GridToStringExclude private final CountDownLatch initLatch = new CountDownLatch(1); + /** * Constructor. */ - public TcpDiscoveryJdbcIpFinder() { - setShared(true); + public TcpDiscoveryJdbcIpFinder(TcpDiscoveryJdbcIpFinderConfiguration cfg) { + super(cfg); + + setShared(cfg.isShared()); + dataSrc = cfg.getDataSrc(); + initSchema = cfg.isInitSchema(); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/edbfd9ad/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/jdbc/TcpDiscoveryJdbcIpFinderConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/jdbc/TcpDiscoveryJdbcIpFinderConfiguration.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/jdbc/TcpDiscoveryJdbcIpFinderConfiguration.java new file mode 100644 index 0000000..1371158 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/jdbc/TcpDiscoveryJdbcIpFinderConfiguration.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp.ipfinder.jdbc; + +import org.apache.ignite.IgniteInterruptedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.resources.IgniteLoggerResource; +import org.apache.ignite.spi.IgniteSpiConfigurationProperty; +import org.apache.ignite.spi.IgniteSpiException; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinderAdapter; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinderAdapterConfiguration; + +import javax.sql.DataSource; +import java.net.InetSocketAddress; +import java.sql.*; +import java.util.Collection; +import java.util.LinkedList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; + +import static java.sql.Connection.TRANSACTION_READ_COMMITTED; + +/** + * JDBC-based IP finder. + * <h1 class="header">Configuration</h1> + * <h2 class="header">Mandatory</h2> + * <ul> + * <li>Data source (see {@link #setDataSource(javax.sql.DataSource)}).</li> + * </ul> + * <h2 class="header">Optional</h2> + * The following configuration parameters are optional: + * <ul> + * <li>Flag indicating whether DB schema should be initialized by GridGain (default behaviour) or + * was explicitly created by user (see {@link #setInitSchema(boolean)})</li> + * </ul> + * <p> + * The database will contain 1 table which will hold IP addresses. + */ +public class TcpDiscoveryJdbcIpFinderConfiguration + extends TcpDiscoveryIpFinderAdapterConfiguration<TcpDiscoveryJdbcIpFinder> { + /** Data source. */ + private DataSource dataSrc; + + /** Flag for schema initialization. */ + private boolean initSchema = true; + + public TcpDiscoveryJdbcIpFinderConfiguration() { + setShared(true); + } + + /** {@inheritDoc} */ + @Override public Class<TcpDiscoveryJdbcIpFinder> getTcpDiscoveryIpFinderClass() { + return TcpDiscoveryJdbcIpFinder.class; + } + + /** + * Sets data source. + * <p> + * Data source should be fully configured and ready-to-use. + * + * @param dataSrc Data source. + */ + @IgniteSpiConfigurationProperty(optional = false) + public void setDataSource(DataSource dataSrc) { + this.dataSrc = dataSrc; + } + + /** + * Flag indicating whether DB schema should be initialized by GridGain (default behaviour) or + * was explicitly created by user. + * + * @param initSchema {@code True} if DB schema should be initialized by GridGain (default behaviour), + * {code @false} if schema was explicitly created by user. + */ + @IgniteSpiConfigurationProperty(optional = true) + public void setInitSchema(boolean initSchema) { + this.initSchema = initSchema; + } + + public DataSource getDataSrc() { + return dataSrc; + } + + public boolean isInitSchema() { + return initSchema; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/edbfd9ad/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java index 278e925..78d0e6a 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java @@ -109,8 +109,21 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder { /** * Constructs new IP finder. */ - public TcpDiscoveryMulticastIpFinder() { - setShared(true); + public TcpDiscoveryMulticastIpFinder() throws IgniteSpiException { + this(new TcpDiscoveryMulticastIpFinderConfiguration()); + } + + /** + * Constructs new IP finder. + */ + public TcpDiscoveryMulticastIpFinder(TcpDiscoveryMulticastIpFinderConfiguration cfg) throws IgniteSpiException { + super(cfg); + + addrReqAttempts = cfg.getAddressRequestAttempts(); + locAddr = cfg.getLocalAddress(); + mcastGrp = cfg.getMulticastGroup(); + mcastPort = cfg.getMulticastPort(); + resWaitTime = cfg.getResponseWaitTime(); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/edbfd9ad/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinderConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinderConfiguration.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinderConfiguration.java new file mode 100644 index 0000000..41cf04d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinderConfiguration.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp.ipfinder.multicast; + +import org.apache.ignite.spi.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; + +/** + * + */ +public class TcpDiscoveryMulticastIpFinderConfiguration extends TcpDiscoveryVmIpFinderConfiguration { + /** Default multicast IP address (value is {@code 228.1.2.4}). */ + public static final String DFLT_MCAST_GROUP = "228.1.2.4"; + + /** Default multicast port number (value is {@code 47400}). */ + public static final int DFLT_MCAST_PORT = 47400; + + /** Default time IP finder waits for reply to multicast address request (value is {@code 500}). */ + public static final int DFLT_RES_WAIT_TIME = 500; + + /** Default number of attempts to send multicast address request (value is {@code 2}). */ + public static final int DFLT_ADDR_REQ_ATTEMPTS = 2; + + /** Multicast IP address as string. */ + private String mcastGrp = DFLT_MCAST_GROUP; + + /** Multicast port number. */ + private int mcastPort = DFLT_MCAST_PORT; + + /** Time IP finder waits for reply to multicast address request. */ + private int resWaitTime = DFLT_RES_WAIT_TIME; + + /** Number of attempts to send multicast address request. */ + private int addrReqAttempts = DFLT_ADDR_REQ_ATTEMPTS; + + /** Local address */ + private String locAddr; + + /** + * Constructs new IP finder. + */ + public TcpDiscoveryMulticastIpFinderConfiguration() { + setShared(true); + } + + /** {@inheritDoc} */ + @Override public Class<TcpDiscoveryMulticastIpFinder> getTcpDiscoveryIpFinderClass() { + return TcpDiscoveryMulticastIpFinder.class; + } + + /** + * Sets IP address of multicast group. + * <p> + * If not provided, default value is {@link #DFLT_MCAST_GROUP}. + * + * @param mcastGrp Multicast IP address. + */ + @IgniteSpiConfigurationProperty(optional = true) + public void setMulticastGroup(String mcastGrp) { + this.mcastGrp = mcastGrp; + } + + /** + * Gets IP address of multicast group. + * + * @return Multicast IP address. + */ + public String getMulticastGroup() { + return mcastGrp; + } + + /** + * Sets port number which multicast messages are sent to. + * <p> + * If not provided, default value is {@link #DFLT_MCAST_PORT}. + * + * @param mcastPort Multicast port number. + */ + @IgniteSpiConfigurationProperty(optional = true) + public void setMulticastPort(int mcastPort) { + this.mcastPort = mcastPort; + } + + /** + * Gets port number which multicast messages are sent to. + * + * @return Port number. + */ + public int getMulticastPort() { + return mcastPort; + } + + /** + * Sets time in milliseconds IP finder waits for reply to + * multicast address request. + * <p> + * If not provided, default value is {@link #DFLT_RES_WAIT_TIME}. + * + * @param resWaitTime Time IP finder waits for reply to multicast address request. + */ + @IgniteSpiConfigurationProperty(optional = true) + public void setResponseWaitTime(int resWaitTime) { + this.resWaitTime = resWaitTime; + } + + /** + * Gets time in milliseconds IP finder waits for reply to + * multicast address request. + * + * @return Time IP finder waits for reply to multicast address request. + */ + public int getResponseWaitTime() { + return resWaitTime; + } + + /** + * Sets number of attempts to send multicast address request. IP finder re-sends + * request only in case if no reply for previous request is received. + * <p> + * If not provided, default value is {@link #DFLT_ADDR_REQ_ATTEMPTS}. + * + * @param addrReqAttempts Number of attempts to send multicast address request. + */ + @IgniteSpiConfigurationProperty(optional = true) + public void setAddressRequestAttempts(int addrReqAttempts) { + this.addrReqAttempts = addrReqAttempts; + } + + /** + * Gets number of attempts to send multicast address request. IP finder re-sends + * request only in case if no reply for previous request is received. + * + * @return Number of attempts to send multicast address request. + */ + public int getAddressRequestAttempts() { + return addrReqAttempts; + } + + /** + * Sets local host address used by this IP finder. If provided address is non-loopback then multicast + * socket is bound to this interface. If local address is not set or is any local address then IP finder + * creates multicast sockets for all found non-loopback addresses. + * <p> + * If not provided then this property is initialized by the local address set in {@link org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi} + * configuration. + * + * @param locAddr Local host address. + * @see org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi#setLocalAddress(String) + */ + @IgniteSpiConfigurationProperty(optional = true) + public void setLocalAddress(String locAddr) { + this.locAddr = locAddr; + } + + /** + * Gets local address that multicast IP finder uses. + * + * @return Local address. + */ + public String getLocalAddress() { + return locAddr; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/edbfd9ad/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/sharedfs/TcpDiscoverySharedFsIpFinder.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/sharedfs/TcpDiscoverySharedFsIpFinder.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/sharedfs/TcpDiscoverySharedFsIpFinder.java index dcca957..8ef3ec6 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/sharedfs/TcpDiscoverySharedFsIpFinder.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/sharedfs/TcpDiscoverySharedFsIpFinder.java @@ -86,11 +86,17 @@ public class TcpDiscoverySharedFsIpFinder extends TcpDiscoveryIpFinderAdapter { @GridToStringExclude private final CountDownLatch initLatch = new CountDownLatch(1); + public TcpDiscoverySharedFsIpFinder() { + super(new TcpDiscoverySharedFsIpFinderConfiguration()); + } + /** * Constructor. */ - public TcpDiscoverySharedFsIpFinder() { - setShared(true); + public TcpDiscoverySharedFsIpFinder(TcpDiscoverySharedFsIpFinderConfiguration cfg) { + super(cfg); + + path = cfg.getPath(); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/edbfd9ad/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/sharedfs/TcpDiscoverySharedFsIpFinderConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/sharedfs/TcpDiscoverySharedFsIpFinderConfiguration.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/sharedfs/TcpDiscoverySharedFsIpFinderConfiguration.java new file mode 100644 index 0000000..167d8e1 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/sharedfs/TcpDiscoverySharedFsIpFinderConfiguration.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp.ipfinder.sharedfs; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteInterruptedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.SB; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.resources.IgniteLoggerResource; +import org.apache.ignite.spi.IgniteSpiConfigurationProperty; +import org.apache.ignite.spi.IgniteSpiException; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinderAdapter; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinderAdapterConfiguration; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedList; +import java.util.StringTokenizer; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * + */ +public class TcpDiscoverySharedFsIpFinderConfiguration + extends TcpDiscoveryIpFinderAdapterConfiguration<TcpDiscoverySharedFsIpFinder> { + /** + * Default path for discovering of local nodes (testing only). Note that this path is relative to + * {@code IGNITE_HOME/work} folder if {@code IGNITE_HOME} system or environment variable specified, + * otherwise it is relative to {@code work} folder under system {@code java.io.tmpdir} folder. + * + * @see org.apache.ignite.configuration.IgniteConfiguration#getWorkDirectory() + */ + public static final String DFLT_PATH = "disco/tcp"; + + /** File-system path. */ + private String path = DFLT_PATH; + + /** + * Constructor. + */ + public TcpDiscoverySharedFsIpFinderConfiguration() { + setShared(true); + } + + /** {@inheritDoc} */ + @Override public Class<? extends TcpDiscoverySharedFsIpFinder> getTcpDiscoveryIpFinderClass() { + return TcpDiscoverySharedFsIpFinder.class; + } + + /** + * Gets path. + * + * @return Shared path. + */ + public String getPath() { + return path; + } + + /** + * Sets path. + * + * @param path Shared path. + */ + @IgniteSpiConfigurationProperty(optional = true) + public void setPath(String path) { + this.path = path; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/edbfd9ad/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/vm/TcpDiscoveryVmIpFinder.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/vm/TcpDiscoveryVmIpFinder.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/vm/TcpDiscoveryVmIpFinder.java index 40234bd..97cf67a 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/vm/TcpDiscoveryVmIpFinder.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/vm/TcpDiscoveryVmIpFinder.java @@ -86,18 +86,10 @@ public class TcpDiscoveryVmIpFinder extends TcpDiscoveryIpFinderAdapter { /** * Constructs new IP finder. */ - public TcpDiscoveryVmIpFinder() { - // No-op. - } + public TcpDiscoveryVmIpFinder(TcpDiscoveryVmIpFinderConfiguration cfg) throws IgniteSpiException { + super(cfg); - /** - * Constructs new IP finder. - * - * @param shared {@code true} if IP finder is shared. - * @see #setShared(boolean) - */ - public TcpDiscoveryVmIpFinder(boolean shared) { - setShared(shared); + setAddresses(cfg.getAddrs()); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/edbfd9ad/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/vm/TcpDiscoveryVmIpFinderConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/vm/TcpDiscoveryVmIpFinderConfiguration.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/vm/TcpDiscoveryVmIpFinderConfiguration.java new file mode 100644 index 0000000..3ef0afc --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/vm/TcpDiscoveryVmIpFinderConfiguration.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp.ipfinder.vm; + +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.resources.IgniteLoggerResource; +import org.apache.ignite.spi.IgniteSpiConfigurationProperty; +import org.apache.ignite.spi.IgniteSpiException; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinderAdapter; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinderAdapterConfiguration; + +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashSet; + +import static org.apache.ignite.IgniteSystemProperties.IGNITE_TCP_DISCOVERY_ADDRESSES; + +/** + * + */ +public class TcpDiscoveryVmIpFinderConfiguration + extends TcpDiscoveryIpFinderAdapterConfiguration<TcpDiscoveryVmIpFinder> { + /** Addresses. */ + private Collection<String> addrs; + + /** {@inheritDoc} */ + @Override public Class<? extends TcpDiscoveryVmIpFinder> getTcpDiscoveryIpFinderClass() { + return TcpDiscoveryVmIpFinder.class; + } + + public TcpDiscoveryVmIpFinderConfiguration() { + } + + public TcpDiscoveryVmIpFinderConfiguration(boolean isShared) { + setShared(isShared); + } + + /** + * Parses provided values and initializes the internal collection of addresses. + * <p> + * Addresses may be represented as follows: + * <ul> + * <li>IP address (e.g. 127.0.0.1, 9.9.9.9, etc);</li> + * <li>IP address and port (e.g. 127.0.0.1:47500, 9.9.9.9:47501, etc);</li> + * <li>IP address and port range (e.g. 127.0.0.1:47500..47510, 9.9.9.9:47501..47504, etc);</li> + * <li>Hostname (e.g. host1.com, host2, etc);</li> + * <li>Hostname and port (e.g. host1.com:47500, host2:47502, etc).</li> + * <li>Hostname and port range (e.g. host1.com:47500..47510, host2:47502..47508, etc).</li> + * </ul> + * <p> + * If port is 0 or not provided then default port will be used (depends on + * discovery SPI configuration). + * <p> + * If port range is provided (e.g. host:port1..port2) the following should be considered: + * <ul> + * <li>{@code port1 < port2} should be {@code true};</li> + * <li>Both {@code port1} and {@code port2} should be greater than {@code 0}.</li> + * </ul> + * + * @param addrs Known nodes addresses. + * @throws org.apache.ignite.spi.IgniteSpiException If any error occurs. + */ + @IgniteSpiConfigurationProperty(optional = true) + public void setAddresses(Collection<String> addrs) throws IgniteSpiException { + this.addrs = addrs; + } + + public Collection<String> getAddrs() { + return addrs; + } +}