# Renaming
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/4f31df32 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/4f31df32 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/4f31df32 Branch: refs/heads/master Commit: 4f31df32c1efa257555c2ae9a154d8e4fa943864 Parents: 717180b Author: sboikov <sboi...@gridgain.com> Authored: Fri Dec 5 16:50:11 2014 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Fri Dec 5 16:50:11 2014 +0300 ---------------------------------------------------------------------- .../store/CacheNodeWithStoreStartup.java | 2 +- .../org/gridgain/grid/kernal/GridGainEx.java | 2 +- .../discovery/tcp/TcpClientDiscoverySpi.java | 4 +- .../grid/spi/discovery/tcp/TcpDiscoverySpi.java | 12 +- .../discovery/tcp/TcpDiscoverySpiAdapter.java | 3 +- .../jdbc/GridTcpDiscoveryJdbcIpFinder.java | 361 --------- .../ipfinder/jdbc/TcpDiscoveryJdbcIpFinder.java | 361 +++++++++ .../GridTcpDiscoveryMulticastIpFinder.java | 752 ------------------- .../TcpDiscoveryMulticastIpFinder.java | 752 +++++++++++++++++++ .../GridTcpDiscoverySharedFsIpFinder.java | 258 ------- .../sharedfs/TcpDiscoverySharedFsIpFinder.java | 258 +++++++ .../test/config/loaders/grid-cfg-2-grids.xml | 4 +- .../core/src/test/config/loaders/grid-cfg.xml | 4 +- .../discovery/tcp/GridTcpDiscoverySelfTest.java | 4 +- .../GridTcpDiscoveryJdbcIpFinderSelfTest.java | 6 +- ...idTcpDiscoveryMulticastIpFinderSelfTest.java | 16 +- ...ridTcpDiscoverySharedFsIpFinderSelfTest.java | 6 +- .../testframework/junits/GridAbstractTest.java | 2 +- .../junits/spi/GridSpiAbstractTest.java | 6 +- 19 files changed, 1405 insertions(+), 1408 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f31df32/examples/src/main/java/org/gridgain/examples/datagrid/store/CacheNodeWithStoreStartup.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/gridgain/examples/datagrid/store/CacheNodeWithStoreStartup.java b/examples/src/main/java/org/gridgain/examples/datagrid/store/CacheNodeWithStoreStartup.java index 2bf0cfd..9035376 100644 --- a/examples/src/main/java/org/gridgain/examples/datagrid/store/CacheNodeWithStoreStartup.java +++ b/examples/src/main/java/org/gridgain/examples/datagrid/store/CacheNodeWithStoreStartup.java @@ -50,7 +50,7 @@ public class CacheNodeWithStoreStartup { // Discovery SPI. TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); - TcpDiscoveryVmIpFinder ipFinder = new GridTcpDiscoveryMulticastIpFinder(); + TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryMulticastIpFinder(); ipFinder.setAddresses(Arrays.asList("127.0.0.1:47500..47509")); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f31df32/modules/core/src/main/java/org/gridgain/grid/kernal/GridGainEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/GridGainEx.java b/modules/core/src/main/java/org/gridgain/grid/kernal/GridGainEx.java index 178d323..5469e86 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/GridGainEx.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/GridGainEx.java @@ -1676,7 +1676,7 @@ public class GridGainEx { TcpDiscoverySpi tcpDisco = (TcpDiscoverySpi)discoSpi; if (tcpDisco.getIpFinder() == null) - tcpDisco.setIpFinder(new GridTcpDiscoveryMulticastIpFinder()); + tcpDisco.setIpFinder(new TcpDiscoveryMulticastIpFinder()); } if (evtSpi == null) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f31df32/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/TcpClientDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/TcpClientDiscoverySpi.java b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/TcpClientDiscoverySpi.java index ff50969..bb3a2ca 100644 --- a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/TcpClientDiscoverySpi.java +++ b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/TcpClientDiscoverySpi.java @@ -228,8 +228,8 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp throw new IgniteSpiException("Unknown local address: " + locAddr, e); } - if (ipFinder instanceof GridTcpDiscoveryMulticastIpFinder) { - GridTcpDiscoveryMulticastIpFinder mcastIpFinder = ((GridTcpDiscoveryMulticastIpFinder)ipFinder); + if (ipFinder instanceof TcpDiscoveryMulticastIpFinder) { + TcpDiscoveryMulticastIpFinder mcastIpFinder = ((TcpDiscoveryMulticastIpFinder)ipFinder); if (mcastIpFinder.getLocalAddress() == null) mcastIpFinder.setLocalAddress(locAddr); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f31df32/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/TcpDiscoverySpi.java index 7a1b964..acd2599 100644 --- a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/TcpDiscoverySpi.java @@ -22,9 +22,7 @@ import org.gridgain.grid.kernal.managers.security.*; import org.gridgain.grid.security.*; import org.gridgain.grid.spi.discovery.*; import org.gridgain.grid.spi.discovery.tcp.internal.*; -import org.gridgain.grid.spi.discovery.tcp.ipfinder.jdbc.*; import org.gridgain.grid.spi.discovery.tcp.ipfinder.multicast.*; -import org.gridgain.grid.spi.discovery.tcp.ipfinder.sharedfs.*; import org.gridgain.grid.spi.discovery.tcp.messages.*; import org.gridgain.grid.util.*; import org.gridgain.grid.util.future.*; @@ -68,11 +66,11 @@ import static org.gridgain.grid.spi.discovery.tcp.messages.GridTcpDiscoveryStatu * (see {@link #setIpFinder(org.gridgain.grid.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder)}). * See the following IP finder implementations for details on configuration: * <ul> - * <li>{@link GridTcpDiscoverySharedFsIpFinder}</li> + * <li>{@link org.gridgain.grid.spi.discovery.tcp.ipfinder.sharedfs.TcpDiscoverySharedFsIpFinder}</li> * <li>{@gglink org.gridgain.grid.spi.discovery.tcp.ipfinder.s3.GridTcpDiscoveryS3IpFinder}</li> - * <li>{@link GridTcpDiscoveryJdbcIpFinder}</li> + * <li>{@link org.gridgain.grid.spi.discovery.tcp.ipfinder.jdbc.TcpDiscoveryJdbcIpFinder}</li> * <li>{@link org.gridgain.grid.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder}</li> - * <li>{@link GridTcpDiscoveryMulticastIpFinder} - default</li> + * <li>{@link org.gridgain.grid.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder} - default</li> * </ul> * </li> * </ul> @@ -853,8 +851,8 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov registerMBean(gridName, this, TcpDiscoverySpiMBean.class); - if (ipFinder instanceof GridTcpDiscoveryMulticastIpFinder) { - GridTcpDiscoveryMulticastIpFinder mcastIpFinder = ((GridTcpDiscoveryMulticastIpFinder)ipFinder); + if (ipFinder instanceof TcpDiscoveryMulticastIpFinder) { + TcpDiscoveryMulticastIpFinder mcastIpFinder = ((TcpDiscoveryMulticastIpFinder)ipFinder); if (mcastIpFinder.getLocalAddress() == null) mcastIpFinder.setLocalAddress(locAddr); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f31df32/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/TcpDiscoverySpiAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/TcpDiscoverySpiAdapter.java b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/TcpDiscoverySpiAdapter.java index 03b422a..47ee030 100644 --- a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/TcpDiscoverySpiAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/TcpDiscoverySpiAdapter.java @@ -20,7 +20,6 @@ import org.gridgain.grid.*; import org.gridgain.grid.spi.discovery.*; import org.gridgain.grid.spi.discovery.tcp.internal.*; import org.gridgain.grid.spi.discovery.tcp.ipfinder.*; -import org.gridgain.grid.spi.discovery.tcp.ipfinder.multicast.*; import org.gridgain.grid.spi.discovery.tcp.messages.*; import org.gridgain.grid.util.*; import org.gridgain.grid.util.io.*; @@ -183,7 +182,7 @@ abstract class TcpDiscoverySpiAdapter extends IgniteSpiAdapter implements Discov /** * Sets IP finder for IP addresses sharing and storing. * <p> - * If not provided {@link GridTcpDiscoveryMulticastIpFinder} will be used by default. + * If not provided {@link org.gridgain.grid.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder} will be used by default. * * @param ipFinder IP finder. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f31df32/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/ipfinder/jdbc/GridTcpDiscoveryJdbcIpFinder.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/ipfinder/jdbc/GridTcpDiscoveryJdbcIpFinder.java b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/ipfinder/jdbc/GridTcpDiscoveryJdbcIpFinder.java deleted file mode 100644 index 0e1a15b..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/ipfinder/jdbc/GridTcpDiscoveryJdbcIpFinder.java +++ /dev/null @@ -1,361 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.spi.discovery.tcp.ipfinder.jdbc; - -import org.apache.ignite.*; -import org.apache.ignite.resources.*; -import org.apache.ignite.spi.*; -import org.gridgain.grid.*; -import org.gridgain.grid.spi.discovery.tcp.ipfinder.*; -import org.gridgain.grid.util.typedef.*; -import org.gridgain.grid.util.typedef.internal.*; -import org.gridgain.grid.util.tostring.*; - -import javax.sql.*; -import java.net.*; -import java.sql.*; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -import static java.sql.Connection.*; - -/** - * JDBC-based IP finder. - * <h1 class="header">Configuration</h1> - * <h2 class="header">Mandatory</h2> - * <ul> - * <li>Data source (see {@link #setDataSource(DataSource)}).</li> - * </ul> - * <h2 class="header">Optional</h2> - * The following configuration parameters are optional: - * <ul> - * <li>Flag indicating whether DB schema should be initialized by GridGain (default behaviour) or - * was explicitly created by user (see {@link #setInitSchema(boolean)})</li> - * </ul> - * <p> - * The database will contain 1 table which will hold IP addresses. - */ -public class GridTcpDiscoveryJdbcIpFinder extends TcpDiscoveryIpFinderAdapter { - /** Query to get addresses. */ - public static final String GET_ADDRS_QRY = "select hostname, port from tbl_addrs"; - - /** Query to register address. */ - public static final String REG_ADDR_QRY = "insert into tbl_addrs values (?, ?)"; - - /** Query to unregister address. */ - public static final String UNREG_ADDR_QRY = "delete from tbl_addrs where hostname = ? and port = ?"; - - /** Query to create addresses table. */ - public static final String CREATE_ADDRS_TABLE_QRY = - "create table if not exists tbl_addrs (" + - "hostname VARCHAR(1024), " + - "port INT)"; - - /** Query to check database validity. */ - public static final String CHK_QRY = "select count(*) from tbl_addrs"; - - /** Grid logger. */ - @IgniteLoggerResource - private IgniteLogger log; - - /** Data source. */ - private DataSource dataSrc; - - /** Flag for schema initialization. */ - private boolean initSchema = true; - - /** Init guard. */ - @GridToStringExclude - private final AtomicBoolean initGuard = new AtomicBoolean(); - - /** Init latch. */ - @GridToStringExclude - private final CountDownLatch initLatch = new CountDownLatch(1); - - /** - * Constructor. - */ - public GridTcpDiscoveryJdbcIpFinder() { - setShared(true); - } - - /** {@inheritDoc} */ - @Override public Collection<InetSocketAddress> getRegisteredAddresses() throws IgniteSpiException { - init(); - - Connection conn = null; - - PreparedStatement stmt = null; - - ResultSet rs = null; - - try { - conn = dataSrc.getConnection(); - - conn.setTransactionIsolation(TRANSACTION_READ_COMMITTED); - - stmt = conn.prepareStatement(GET_ADDRS_QRY); - - rs = stmt.executeQuery(); - - Collection<InetSocketAddress> addrs = new LinkedList<>(); - - while (rs.next()) - addrs.add(new InetSocketAddress(rs.getString(1), rs.getInt(2))); - - return addrs; - } - catch (SQLException e) { - throw new IgniteSpiException("Failed to get registered addresses version.", e); - } - finally { - U.closeQuiet(rs); - U.closeQuiet(stmt); - U.closeQuiet(conn); - } - } - - /** {@inheritDoc} */ - @Override public void registerAddresses(Collection<InetSocketAddress> addrs) throws IgniteSpiException { - assert !F.isEmpty(addrs); - - init(); - - Connection conn = null; - - PreparedStatement stmtUnreg = null; - - PreparedStatement stmtReg = null; - - boolean committed = false; - - try { - conn = dataSrc.getConnection(); - - conn.setAutoCommit(false); - - conn.setTransactionIsolation(TRANSACTION_READ_COMMITTED); - - stmtUnreg = conn.prepareStatement(UNREG_ADDR_QRY); - stmtReg = conn.prepareStatement(REG_ADDR_QRY); - - for (InetSocketAddress addr : addrs) { - stmtUnreg.setString(1, addr.getAddress().getHostAddress()); - stmtUnreg.setInt(2, addr.getPort()); - - stmtUnreg.addBatch(); - - stmtReg.setString(1, addr.getAddress().getHostAddress()); - stmtReg.setInt(2, addr.getPort()); - - stmtReg.addBatch(); - } - - stmtUnreg.executeBatch(); - stmtUnreg.close(); - - stmtReg.executeBatch(); - stmtReg.close(); - - conn.commit(); - - committed = true; - } - catch (SQLException e) { - U.rollbackConnectionQuiet(conn); - - throw new IgniteSpiException("Failed to register addresses: " + addrs, e); - } - finally { - if (!committed) - U.rollbackConnectionQuiet(conn); - - U.closeQuiet(stmtUnreg); - U.closeQuiet(stmtReg); - U.closeQuiet(conn); - } - } - - /** {@inheritDoc} */ - @Override public void unregisterAddresses(Collection<InetSocketAddress> addrs) throws IgniteSpiException { - assert !F.isEmpty(addrs); - - init(); - - Connection conn = null; - - PreparedStatement stmt = null; - - boolean committed = false; - - try { - conn = dataSrc.getConnection(); - - conn.setAutoCommit(false); - - conn.setTransactionIsolation(TRANSACTION_READ_COMMITTED); - - stmt = conn.prepareStatement(UNREG_ADDR_QRY); - - for (InetSocketAddress addr : addrs) { - stmt.setString(1, addr.getAddress().getHostAddress()); - stmt.setInt(2, addr.getPort()); - - stmt.addBatch(); - } - - stmt.executeBatch(); - conn.commit(); - - committed = true; - } - catch (SQLException e) { - U.rollbackConnectionQuiet(conn); - - throw new IgniteSpiException("Failed to unregister addresses: " + addrs, e); - } - finally { - if (!committed) - U.rollbackConnectionQuiet(conn); - - U.closeQuiet(stmt); - U.closeQuiet(conn); - } - } - - /** - * Sets data source. - * <p> - * Data source should be fully configured and ready-to-use. - * - * @param dataSrc Data source. - */ - @IgniteSpiConfiguration(optional = false) - public void setDataSource(DataSource dataSrc) { - this.dataSrc = dataSrc; - } - - /** - * Flag indicating whether DB schema should be initialized by GridGain (default behaviour) or - * was explicitly created by user. - * - * @param initSchema {@code True} if DB schema should be initialized by GridGain (default behaviour), - * {code @false} if schema was explicitly created by user. - */ - @IgniteSpiConfiguration(optional = true) - public void setInitSchema(boolean initSchema) { - this.initSchema = initSchema; - } - - /** - * Checks configuration validity. - * - * @throws org.apache.ignite.spi.IgniteSpiException If any error occurs. - */ - private void init() throws IgniteSpiException { - if (initGuard.compareAndSet(false, true)) { - if (dataSrc == null) - throw new IgniteSpiException("Data source is null (you must configure it via setDataSource(..)" + - " configuration property)"); - - if (!initSchema) { - initLatch.countDown(); - - checkSchema(); - - return; - } - - Connection conn = null; - - Statement stmt = null; - - boolean committed = false; - - try { - conn = dataSrc.getConnection(); - - conn.setAutoCommit(false); - - conn.setTransactionIsolation(TRANSACTION_READ_COMMITTED); - - // Create tbl_addrs. - stmt = conn.createStatement(); - - stmt.executeUpdate(CREATE_ADDRS_TABLE_QRY); - - conn.commit(); - - committed = true; - - if (log.isDebugEnabled()) - log.debug("DB schema has been initialized."); - } - catch (SQLException e) { - U.rollbackConnectionQuiet(conn); - - throw new IgniteSpiException("Failed to initialize DB schema.", e); - } - finally { - if (!committed) - U.rollbackConnectionQuiet(conn); - - U.closeQuiet(stmt); - U.closeQuiet(conn); - - initLatch.countDown(); - } - } - else - checkSchema(); - } - - /** - * Checks correctness of existing DB schema. - * - * @throws org.apache.ignite.spi.IgniteSpiException If schema wasn't properly initialized. - */ - private void checkSchema() throws IgniteSpiException { - try { - U.await(initLatch); - } - catch (GridInterruptedException e) { - throw new IgniteSpiException("Thread has been interrupted.", e); - } - - Connection conn = null; - - Statement stmt = null; - - try { - conn = dataSrc.getConnection(); - - conn.setTransactionIsolation(TRANSACTION_READ_COMMITTED); - - // Check if tbl_addrs exists and database initialized properly. - stmt = conn.createStatement(); - - stmt.execute(CHK_QRY); - } - catch (SQLException e) { - throw new IgniteSpiException("IP finder has not been properly initialized.", e); - } - finally { - U.closeQuiet(stmt); - U.closeQuiet(conn); - } - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridTcpDiscoveryJdbcIpFinder.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f31df32/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/ipfinder/jdbc/TcpDiscoveryJdbcIpFinder.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/ipfinder/jdbc/TcpDiscoveryJdbcIpFinder.java b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/ipfinder/jdbc/TcpDiscoveryJdbcIpFinder.java new file mode 100644 index 0000000..4b5c00b --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/ipfinder/jdbc/TcpDiscoveryJdbcIpFinder.java @@ -0,0 +1,361 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.spi.discovery.tcp.ipfinder.jdbc; + +import org.apache.ignite.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.spi.*; +import org.gridgain.grid.*; +import org.gridgain.grid.spi.discovery.tcp.ipfinder.*; +import org.gridgain.grid.util.typedef.*; +import org.gridgain.grid.util.typedef.internal.*; +import org.gridgain.grid.util.tostring.*; + +import javax.sql.*; +import java.net.*; +import java.sql.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static java.sql.Connection.*; + +/** + * JDBC-based IP finder. + * <h1 class="header">Configuration</h1> + * <h2 class="header">Mandatory</h2> + * <ul> + * <li>Data source (see {@link #setDataSource(DataSource)}).</li> + * </ul> + * <h2 class="header">Optional</h2> + * The following configuration parameters are optional: + * <ul> + * <li>Flag indicating whether DB schema should be initialized by GridGain (default behaviour) or + * was explicitly created by user (see {@link #setInitSchema(boolean)})</li> + * </ul> + * <p> + * The database will contain 1 table which will hold IP addresses. + */ +public class TcpDiscoveryJdbcIpFinder extends TcpDiscoveryIpFinderAdapter { + /** Query to get addresses. */ + public static final String GET_ADDRS_QRY = "select hostname, port from tbl_addrs"; + + /** Query to register address. */ + public static final String REG_ADDR_QRY = "insert into tbl_addrs values (?, ?)"; + + /** Query to unregister address. */ + public static final String UNREG_ADDR_QRY = "delete from tbl_addrs where hostname = ? and port = ?"; + + /** Query to create addresses table. */ + public static final String CREATE_ADDRS_TABLE_QRY = + "create table if not exists tbl_addrs (" + + "hostname VARCHAR(1024), " + + "port INT)"; + + /** Query to check database validity. */ + public static final String CHK_QRY = "select count(*) from tbl_addrs"; + + /** Grid logger. */ + @IgniteLoggerResource + private IgniteLogger log; + + /** Data source. */ + private DataSource dataSrc; + + /** Flag for schema initialization. */ + private boolean initSchema = true; + + /** Init guard. */ + @GridToStringExclude + private final AtomicBoolean initGuard = new AtomicBoolean(); + + /** Init latch. */ + @GridToStringExclude + private final CountDownLatch initLatch = new CountDownLatch(1); + + /** + * Constructor. + */ + public TcpDiscoveryJdbcIpFinder() { + setShared(true); + } + + /** {@inheritDoc} */ + @Override public Collection<InetSocketAddress> getRegisteredAddresses() throws IgniteSpiException { + init(); + + Connection conn = null; + + PreparedStatement stmt = null; + + ResultSet rs = null; + + try { + conn = dataSrc.getConnection(); + + conn.setTransactionIsolation(TRANSACTION_READ_COMMITTED); + + stmt = conn.prepareStatement(GET_ADDRS_QRY); + + rs = stmt.executeQuery(); + + Collection<InetSocketAddress> addrs = new LinkedList<>(); + + while (rs.next()) + addrs.add(new InetSocketAddress(rs.getString(1), rs.getInt(2))); + + return addrs; + } + catch (SQLException e) { + throw new IgniteSpiException("Failed to get registered addresses version.", e); + } + finally { + U.closeQuiet(rs); + U.closeQuiet(stmt); + U.closeQuiet(conn); + } + } + + /** {@inheritDoc} */ + @Override public void registerAddresses(Collection<InetSocketAddress> addrs) throws IgniteSpiException { + assert !F.isEmpty(addrs); + + init(); + + Connection conn = null; + + PreparedStatement stmtUnreg = null; + + PreparedStatement stmtReg = null; + + boolean committed = false; + + try { + conn = dataSrc.getConnection(); + + conn.setAutoCommit(false); + + conn.setTransactionIsolation(TRANSACTION_READ_COMMITTED); + + stmtUnreg = conn.prepareStatement(UNREG_ADDR_QRY); + stmtReg = conn.prepareStatement(REG_ADDR_QRY); + + for (InetSocketAddress addr : addrs) { + stmtUnreg.setString(1, addr.getAddress().getHostAddress()); + stmtUnreg.setInt(2, addr.getPort()); + + stmtUnreg.addBatch(); + + stmtReg.setString(1, addr.getAddress().getHostAddress()); + stmtReg.setInt(2, addr.getPort()); + + stmtReg.addBatch(); + } + + stmtUnreg.executeBatch(); + stmtUnreg.close(); + + stmtReg.executeBatch(); + stmtReg.close(); + + conn.commit(); + + committed = true; + } + catch (SQLException e) { + U.rollbackConnectionQuiet(conn); + + throw new IgniteSpiException("Failed to register addresses: " + addrs, e); + } + finally { + if (!committed) + U.rollbackConnectionQuiet(conn); + + U.closeQuiet(stmtUnreg); + U.closeQuiet(stmtReg); + U.closeQuiet(conn); + } + } + + /** {@inheritDoc} */ + @Override public void unregisterAddresses(Collection<InetSocketAddress> addrs) throws IgniteSpiException { + assert !F.isEmpty(addrs); + + init(); + + Connection conn = null; + + PreparedStatement stmt = null; + + boolean committed = false; + + try { + conn = dataSrc.getConnection(); + + conn.setAutoCommit(false); + + conn.setTransactionIsolation(TRANSACTION_READ_COMMITTED); + + stmt = conn.prepareStatement(UNREG_ADDR_QRY); + + for (InetSocketAddress addr : addrs) { + stmt.setString(1, addr.getAddress().getHostAddress()); + stmt.setInt(2, addr.getPort()); + + stmt.addBatch(); + } + + stmt.executeBatch(); + conn.commit(); + + committed = true; + } + catch (SQLException e) { + U.rollbackConnectionQuiet(conn); + + throw new IgniteSpiException("Failed to unregister addresses: " + addrs, e); + } + finally { + if (!committed) + U.rollbackConnectionQuiet(conn); + + U.closeQuiet(stmt); + U.closeQuiet(conn); + } + } + + /** + * Sets data source. + * <p> + * Data source should be fully configured and ready-to-use. + * + * @param dataSrc Data source. + */ + @IgniteSpiConfiguration(optional = false) + public void setDataSource(DataSource dataSrc) { + this.dataSrc = dataSrc; + } + + /** + * Flag indicating whether DB schema should be initialized by GridGain (default behaviour) or + * was explicitly created by user. + * + * @param initSchema {@code True} if DB schema should be initialized by GridGain (default behaviour), + * {code @false} if schema was explicitly created by user. + */ + @IgniteSpiConfiguration(optional = true) + public void setInitSchema(boolean initSchema) { + this.initSchema = initSchema; + } + + /** + * Checks configuration validity. + * + * @throws org.apache.ignite.spi.IgniteSpiException If any error occurs. + */ + private void init() throws IgniteSpiException { + if (initGuard.compareAndSet(false, true)) { + if (dataSrc == null) + throw new IgniteSpiException("Data source is null (you must configure it via setDataSource(..)" + + " configuration property)"); + + if (!initSchema) { + initLatch.countDown(); + + checkSchema(); + + return; + } + + Connection conn = null; + + Statement stmt = null; + + boolean committed = false; + + try { + conn = dataSrc.getConnection(); + + conn.setAutoCommit(false); + + conn.setTransactionIsolation(TRANSACTION_READ_COMMITTED); + + // Create tbl_addrs. + stmt = conn.createStatement(); + + stmt.executeUpdate(CREATE_ADDRS_TABLE_QRY); + + conn.commit(); + + committed = true; + + if (log.isDebugEnabled()) + log.debug("DB schema has been initialized."); + } + catch (SQLException e) { + U.rollbackConnectionQuiet(conn); + + throw new IgniteSpiException("Failed to initialize DB schema.", e); + } + finally { + if (!committed) + U.rollbackConnectionQuiet(conn); + + U.closeQuiet(stmt); + U.closeQuiet(conn); + + initLatch.countDown(); + } + } + else + checkSchema(); + } + + /** + * Checks correctness of existing DB schema. + * + * @throws org.apache.ignite.spi.IgniteSpiException If schema wasn't properly initialized. + */ + private void checkSchema() throws IgniteSpiException { + try { + U.await(initLatch); + } + catch (GridInterruptedException e) { + throw new IgniteSpiException("Thread has been interrupted.", e); + } + + Connection conn = null; + + Statement stmt = null; + + try { + conn = dataSrc.getConnection(); + + conn.setTransactionIsolation(TRANSACTION_READ_COMMITTED); + + // Check if tbl_addrs exists and database initialized properly. + stmt = conn.createStatement(); + + stmt.execute(CHK_QRY); + } + catch (SQLException e) { + throw new IgniteSpiException("IP finder has not been properly initialized.", e); + } + finally { + U.closeQuiet(stmt); + U.closeQuiet(conn); + } + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TcpDiscoveryJdbcIpFinder.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f31df32/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/ipfinder/multicast/GridTcpDiscoveryMulticastIpFinder.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/ipfinder/multicast/GridTcpDiscoveryMulticastIpFinder.java b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/ipfinder/multicast/GridTcpDiscoveryMulticastIpFinder.java deleted file mode 100644 index 1c4a5f9..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/ipfinder/multicast/GridTcpDiscoveryMulticastIpFinder.java +++ /dev/null @@ -1,752 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.spi.discovery.tcp.ipfinder.multicast; - -import org.apache.ignite.*; -import org.apache.ignite.marshaller.*; -import org.apache.ignite.marshaller.jdk.*; -import org.apache.ignite.resources.*; -import org.apache.ignite.spi.*; -import org.gridgain.grid.*; -import org.gridgain.grid.spi.discovery.tcp.ipfinder.vm.*; -import org.gridgain.grid.util.typedef.*; -import org.gridgain.grid.util.typedef.internal.*; -import org.gridgain.grid.util.tostring.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.net.*; -import java.util.*; - -import static org.apache.ignite.IgniteSystemProperties.*; -import static org.apache.ignite.spi.IgnitePortProtocol.*; - -/** - * Multicast-based IP finder. - * <p> - * When TCP discovery starts this finder sends multicast request and waits - * for some time when others nodes reply to this request with messages containing - * their addresses (time IP finder waits for response and number of attempts to - * re-send multicast request in case if no replies are received can be configured, - * see {@link #setResponseWaitTime(int)} and {@link #setAddressRequestAttempts(int)}). - * <p> - * In addition to address received via multicast this finder can work with pre-configured - * list of addresses specified via {@link #setAddresses(Collection)} method. - * <h1 class="header">Configuration</h1> - * <h2 class="header">Mandatory</h2> - * There are no mandatory configuration parameters. - * <h2 class="header">Optional</h2> - * <ul> - * <li>Multicast IP address (see {@link #setMulticastGroup(String)}).</li> - * <li>Multicast port number (see {@link #setMulticastPort(int)}).</li> - * <li>Address response wait time (see {@link #setResponseWaitTime(int)}).</li> - * <li>Address request attempts (see {@link #setAddressRequestAttempts(int)}).</li> - * <li>Pre-configured addresses (see {@link #setAddresses(Collection)})</li> - * <li>Local address (see {@link #setLocalAddress(String)})</li> - * </ul> - */ -public class GridTcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder { - /** Default multicast IP address (value is {@code 228.1.2.4}). */ - public static final String DFLT_MCAST_GROUP = "228.1.2.4"; - - /** Default multicast port number (value is {@code 47400}). */ - public static final int DFLT_MCAST_PORT = 47400; - - /** Default time IP finder waits for reply to multicast address request (value is {@code 500}). */ - public static final int DFLT_RES_WAIT_TIME = 500; - - /** Default number of attempts to send multicast address request (value is {@code 2}). */ - public static final int DFLT_ADDR_REQ_ATTEMPTS = 2; - - /** Address request message data. */ - private static final byte[] MSG_ADDR_REQ_DATA = U.GG_HEADER; - - /** */ - private static final IgniteMarshaller marsh = new IgniteJdkMarshaller(); - - /** Grid logger. */ - @IgniteLoggerResource - private IgniteLogger log; - - /** Grid name. */ - @IgniteNameResource - @GridToStringExclude - private String gridName; - - /** Multicast IP address as string. */ - private String mcastGrp = DFLT_MCAST_GROUP; - - /** Multicast port number. */ - private int mcastPort = DFLT_MCAST_PORT; - - /** Time IP finder waits for reply to multicast address request. */ - private int resWaitTime = DFLT_RES_WAIT_TIME; - - /** Number of attempts to send multicast address request. */ - private int addrReqAttempts = DFLT_ADDR_REQ_ATTEMPTS; - - /** Local address */ - private String locAddr; - - /** */ - @GridToStringExclude - private Collection<AddressSender> addrSnds; - - /** - * Constructs new IP finder. - */ - public GridTcpDiscoveryMulticastIpFinder() { - setShared(true); - } - - /** - * Sets IP address of multicast group. - * <p> - * If not provided, default value is {@link #DFLT_MCAST_GROUP}. - * - * @param mcastGrp Multicast IP address. - */ - @IgniteSpiConfiguration(optional = true) - public void setMulticastGroup(String mcastGrp) { - this.mcastGrp = mcastGrp; - } - - /** - * Gets IP address of multicast group. - * - * @return Multicast IP address. - */ - public String getMulticastGroup() { - return mcastGrp; - } - - /** - * Sets port number which multicast messages are sent to. - * <p> - * If not provided, default value is {@link #DFLT_MCAST_PORT}. - * - * @param mcastPort Multicast port number. - */ - @IgniteSpiConfiguration(optional = true) - public void setMulticastPort(int mcastPort) { - this.mcastPort = mcastPort; - } - - /** - * Gets port number which multicast messages are sent to. - * - * @return Port number. - */ - public int getMulticastPort() { - return mcastPort; - } - - /** - * Sets time in milliseconds IP finder waits for reply to - * multicast address request. - * <p> - * If not provided, default value is {@link #DFLT_RES_WAIT_TIME}. - * - * @param resWaitTime Time IP finder waits for reply to multicast address request. - */ - @IgniteSpiConfiguration(optional = true) - public void setResponseWaitTime(int resWaitTime) { - this.resWaitTime = resWaitTime; - } - - /** - * Gets time in milliseconds IP finder waits for reply to - * multicast address request. - * - * @return Time IP finder waits for reply to multicast address request. - */ - public int getResponseWaitTime() { - return resWaitTime; - } - - /** - * Sets number of attempts to send multicast address request. IP finder re-sends - * request only in case if no reply for previous request is received. - * <p> - * If not provided, default value is {@link #DFLT_ADDR_REQ_ATTEMPTS}. - * - * @param addrReqAttempts Number of attempts to send multicast address request. - */ - @IgniteSpiConfiguration(optional = true) - public void setAddressRequestAttempts(int addrReqAttempts) { - this.addrReqAttempts = addrReqAttempts; - } - - /** - * Gets number of attempts to send multicast address request. IP finder re-sends - * request only in case if no reply for previous request is received. - * - * @return Number of attempts to send multicast address request. - */ - public int getAddressRequestAttempts() { - return addrReqAttempts; - } - - /** - * Sets local host address used by this IP finder. If provided address is non-loopback then multicast - * socket is bound to this interface. If local address is not set or is any local address then IP finder - * creates multicast sockets for all found non-loopback addresses. - * <p> - * If not provided then this property is initialized by the local address set in {@link org.gridgain.grid.spi.discovery.tcp.TcpDiscoverySpi} - * configuration. - * - * @param locAddr Local host address. - * @see org.gridgain.grid.spi.discovery.tcp.TcpDiscoverySpi#setLocalAddress(String) - */ - @IgniteSpiConfiguration(optional = true) - public void setLocalAddress(String locAddr) { - this.locAddr = locAddr; - } - - /** - * Gets local address that multicast IP finder uses. - * - * @return Local address. - */ - public String getLocalAddress() { - return locAddr; - } - - /** {@inheritDoc} */ - @Override public void initializeLocalAddresses(Collection<InetSocketAddress> addrs) throws IgniteSpiException { - // If GRIDGAIN_OVERRIDE_MCAST_GRP system property is set, use its value to override multicast group from - // configuration. Used for testing purposes. - String overrideMcastGrp = System.getProperty(GG_OVERRIDE_MCAST_GRP); - - if (overrideMcastGrp != null) - mcastGrp = overrideMcastGrp; - - if (F.isEmpty(mcastGrp)) - throw new IgniteSpiException("Multicast IP address is not specified."); - - if (mcastPort < 0 || mcastPort > 65535) - throw new IgniteSpiException("Invalid multicast port: " + mcastPort); - - if (resWaitTime <= 0) - throw new IgniteSpiException("Invalid wait time, value greater than zero is expected: " + resWaitTime); - - if (addrReqAttempts <= 0) - throw new IgniteSpiException("Invalid number of address request attempts, " + - "value greater than zero is expected: " + addrReqAttempts); - - if (F.isEmpty(getRegisteredAddresses())) - U.warn(log, "GridTcpDiscoveryMulticastIpFinder has no pre-configured addresses " + - "(it is recommended in production to specify at least one address in " + - "GridTcpDiscoveryMulticastIpFinder.getAddresses() configuration property)"); - - InetAddress mcastAddr; - - try { - mcastAddr = InetAddress.getByName(mcastGrp); - } - catch (UnknownHostException e) { - throw new IgniteSpiException("Unknown multicast group: " + mcastGrp, e); - } - - if (!mcastAddr.isMulticastAddress()) - throw new IgniteSpiException("Invalid multicast group address: " + mcastAddr); - - Collection<String> locAddrs; - - try { - locAddrs = U.resolveLocalAddresses(U.resolveLocalHost(locAddr)).get1(); - } - catch (IOException | GridException e) { - throw new IgniteSpiException("Failed to resolve local addresses [locAddr=" + locAddr + ']', e); - } - - assert locAddrs != null; - - addrSnds = new ArrayList<>(locAddrs.size()); - - Collection<InetAddress> reqItfs = new ArrayList<>(locAddrs.size()); // Interfaces used to send requests. - - for (String locAddr : locAddrs) { - InetAddress addr; - - try { - addr = InetAddress.getByName(locAddr); - } - catch (UnknownHostException e) { - if (log.isDebugEnabled()) - log.debug("Failed to resolve local address [locAddr=" + locAddr + ", err=" + e + ']'); - - continue; - } - - if (!addr.isLoopbackAddress()) { - try { - addrSnds.add(new AddressSender(mcastAddr, addr, addrs)); - - reqItfs.add(addr); - } - catch (IOException e) { - if (log.isDebugEnabled()) - log.debug("Failed to create multicast socket [mcastAddr=" + mcastAddr + - ", mcastGrp=" + mcastGrp + ", mcastPort=" + mcastPort + ", locAddr=" + addr + - ", err=" + e + ']'); - } - } - } - - if (addrSnds.isEmpty()) { - try { - // Create non-bound socket if local host is loopback or failed to create sockets explicitly - // bound to interfaces. - addrSnds.add(new AddressSender(mcastAddr, null, addrs)); - } - catch (IOException e) { - throw new IgniteSpiException("Failed to create multicast socket [mcastAddr=" + mcastAddr + - ", mcastGrp=" + mcastGrp + ", mcastPort=" + mcastPort + ']', e); - } - } - - for (AddressSender addrSnd :addrSnds) - addrSnd.start(); - - Collection<InetSocketAddress> ret; - - if (reqItfs.size() > 1) { - ret = new HashSet<>(); - - Collection<AddressReceiver> rcvrs = new ArrayList<>(); - - for (InetAddress itf : reqItfs) { - AddressReceiver rcvr = new AddressReceiver(mcastAddr, itf); - - rcvr.start(); - - rcvrs.add(rcvr); - } - - for (AddressReceiver rcvr : rcvrs) { - try { - rcvr.join(); - - ret.addAll(rcvr.addresses()); - } - catch (InterruptedException ignore) { - U.warn(log, "Got interrupted while receiving address request."); - - Thread.currentThread().interrupt(); - - break; - } - } - } - else - ret = requestAddresses(mcastAddr, F.first(reqItfs)); - - if (!ret.isEmpty()) - registerAddresses(ret); - } - - /** {@inheritDoc} */ - @Override public void onSpiContextInitialized(IgniteSpiContext spiCtx) throws IgniteSpiException { - super.onSpiContextInitialized(spiCtx); - - spiCtx.registerPort(mcastPort, UDP); - } - - /** - * Sends multicast address request message and waits for reply. Response wait time and number - * of request attempts are configured as properties {@link #setResponseWaitTime} and - * {@link #setAddressRequestAttempts}. - * - * @param mcastAddr Multicast address where to send request. - * @param sockItf Optional interface multicast socket should be bound to. - * @return Collection of received addresses. - */ - private Collection<InetSocketAddress> requestAddresses(InetAddress mcastAddr, @Nullable InetAddress sockItf) { - Collection<InetSocketAddress> rmtAddrs = new HashSet<>(); - - try { - DatagramPacket reqPckt = new DatagramPacket(MSG_ADDR_REQ_DATA, MSG_ADDR_REQ_DATA.length, - mcastAddr, mcastPort); - - byte[] resData = new byte[AddressResponse.MAX_DATA_LENGTH]; - - DatagramPacket resPckt = new DatagramPacket(resData, resData.length); - - boolean sndError = false; - - for (int i = 0; i < addrReqAttempts; i++) { - MulticastSocket sock = null; - - try { - sock = new MulticastSocket(0); - - // Use 'false' to enable support for more than one node on the same machine. - sock.setLoopbackMode(false); - - if (sockItf != null) - sock.setInterface(sockItf); - - sock.setSoTimeout(resWaitTime); - - reqPckt.setData(MSG_ADDR_REQ_DATA); - - try { - sock.send(reqPckt); - } - catch (IOException e) { - if (!handleNetworkError(e)) - break; - - if (i < addrReqAttempts - 1) { - if (log.isDebugEnabled()) - log.debug("Failed to send multicast address request (will retry in 500 ms): " + e); - - U.sleep(500); - } - else { - if (log.isDebugEnabled()) - log.debug("Failed to send multicast address request: " + e); - } - - sndError = true; - - continue; - } - - long rcvEnd = U.currentTimeMillis() + resWaitTime; - - try { - while (U.currentTimeMillis() < rcvEnd) { // Try to receive multiple responses. - sock.receive(resPckt); - - byte[] data = resPckt.getData(); - - if (!U.bytesEqual(U.GG_HEADER, 0, data, 0, U.GG_HEADER.length)) { - U.error(log, "Failed to verify message header."); - - continue; - } - - AddressResponse addrRes; - - try { - addrRes = new AddressResponse(data); - } - catch (GridException e) { - LT.warn(log, e, "Failed to deserialize multicast response."); - - continue; - } - - rmtAddrs.addAll(addrRes.addresses()); - } - } - catch (SocketTimeoutException ignored) { - if (log.isDebugEnabled()) // DatagramSocket.receive timeout has expired. - log.debug("Address receive timeout."); - } - } - catch (IOException e) { - U.error(log, "Failed to request nodes addresses.", e); - } - finally { - U.close(sock); - } - - if (!rmtAddrs.isEmpty()) - break; - - if (i < addrReqAttempts - 1) // Wait some time before re-sending address request. - U.sleep(200); - } - - if (log.isDebugEnabled()) - log.debug("Received nodes addresses: " + rmtAddrs); - - if (rmtAddrs.isEmpty() && sndError) - U.quietAndWarn(log, "Failed to send multicast message (is multicast enabled on this node?)."); - - return rmtAddrs; - } - catch (GridInterruptedException ignored) { - U.warn(log, "Got interrupted while sending address request."); - - Thread.currentThread().interrupt(); - - return rmtAddrs; - } - } - - /** {@inheritDoc} */ - @Override public void close() { - for (AddressSender addrSnd : addrSnds) - U.interrupt(addrSnd); - - for (AddressSender addrSnd : addrSnds) - U.join(addrSnd, log); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridTcpDiscoveryMulticastIpFinder.class, this, "super", super.toString()); - } - - /** - * @param e Network error to handle. - * @return {@code True} if this error is recoverable and the operation can be retried. - */ - private boolean handleNetworkError(IOException e) { - if ("Network is unreachable".equals(e.getMessage()) && U.isMacOs()) { - U.warn(log, "Multicast does not work on Mac OS JVM loopback address (configure external IP address " + - "for 'localHost' configuration property)"); - - return false; - } - - return true; - } - - /** - * Response to multicast address request. - */ - private static class AddressResponse { - /** Maximum supported multicast message. */ - public static final int MAX_DATA_LENGTH = 64 * 1024; - - /** */ - private byte[] data; - - /** */ - private Collection<InetSocketAddress> addrs; - - /** - * @param addrs Addresses discovery SPI binds to. - * @throws GridException If marshalling failed. - */ - private AddressResponse(Collection<InetSocketAddress> addrs) throws GridException { - this.addrs = addrs; - - byte[] addrsData = marsh.marshal(addrs); - data = new byte[U.GG_HEADER.length + addrsData.length]; - - if (data.length > MAX_DATA_LENGTH) - throw new GridException("Too long data packet [size=" + data.length + ", max=" + MAX_DATA_LENGTH + "]"); - - System.arraycopy(U.GG_HEADER, 0, data, 0, U.GG_HEADER.length); - System.arraycopy(addrsData, 0, data, 4, addrsData.length); - } - - /** - * @param data Message data. - * @throws GridException If unmarshalling failed. - */ - private AddressResponse(byte[] data) throws GridException { - assert U.bytesEqual(U.GG_HEADER, 0, data, 0, U.GG_HEADER.length); - - this.data = data; - - addrs = marsh.unmarshal(Arrays.copyOfRange(data, U.GG_HEADER.length, data.length), null); - } - - /** - * @return Message data. - */ - byte[] data() { - return data; - } - - /** - * @return IP address discovery SPI binds to. - */ - public Collection<InetSocketAddress> addresses() { - return addrs; - } - } - - /** - * Thread sends multicast address request message and waits for reply. - */ - private class AddressReceiver extends IgniteSpiThread { - /** */ - private final InetAddress mcastAddr; - - /** */ - private final InetAddress sockAddr; - - /** */ - private Collection<InetSocketAddress> addrs; - - /** - * @param mcastAddr Multicast address where to send request. - * @param sockAddr Optional address multicast socket should be bound to. - */ - private AddressReceiver(InetAddress mcastAddr, InetAddress sockAddr) { - super(gridName, "tcp-disco-multicast-addr-rcvr", log); - this.mcastAddr = mcastAddr; - this.sockAddr = sockAddr; - } - - /** {@inheritDoc} */ - @Override protected void body() throws InterruptedException { - addrs = requestAddresses(mcastAddr, sockAddr); - } - - /** - * @return Received addresses. - */ - Collection<InetSocketAddress> addresses() { - return addrs; - } - } - - /** - * Thread listening for multicast address requests and sending response - * containing socket address this node's discovery SPI listens to. - */ - private class AddressSender extends IgniteSpiThread { - /** */ - @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") - private MulticastSocket sock; - - /** */ - private final InetAddress mcastGrp; - - /** */ - private final Collection<InetSocketAddress> addrs; - - /** */ - private final InetAddress sockItf; - - /** - * @param mcastGrp Multicast address. - * @param sockItf Optional interface multicast socket should be bound to. - * @param addrs Local node addresses. - * @throws IOException If fails to create multicast socket. - */ - private AddressSender(InetAddress mcastGrp, @Nullable InetAddress sockItf, Collection<InetSocketAddress> addrs) - throws IOException { - super(gridName, "tcp-disco-multicast-addr-sender", log); - this.mcastGrp = mcastGrp; - this.addrs = addrs; - this.sockItf = sockItf; - - sock = createSocket(); - } - - /** - * Creates multicast socket and joins multicast group. - * - * @throws IOException If fails to create socket or join multicast group. - * @return Multicast socket. - */ - private MulticastSocket createSocket() throws IOException { - MulticastSocket sock = new MulticastSocket(mcastPort); - - sock.setLoopbackMode(false); // Use 'false' to enable support for more than one node on the same machine. - - if (sockItf != null) - sock.setInterface(sockItf); - - if (sock.getLoopbackMode()) - U.warn(log, "Loopback mode is disabled which prevents nodes on the same machine from discovering " + - "each other."); - - sock.joinGroup(mcastGrp); - - return sock; - } - - /** {@inheritDoc} */ - @Override protected void body() throws InterruptedException { - AddressResponse res; - - try { - res = new AddressResponse(addrs); - } - catch (GridException e) { - U.error(log, "Failed to prepare multicast message.", e); - - return; - } - - byte[] reqData = new byte[MSG_ADDR_REQ_DATA.length]; - - DatagramPacket pckt = new DatagramPacket(reqData, reqData.length); - - while (!isInterrupted()) { - try { - MulticastSocket sock; - - synchronized (this) { - if (isInterrupted()) - return; - - sock = this.sock; - - if (sock == null) - sock = createSocket(); - } - - sock.receive(pckt); - - if (!U.bytesEqual(U.GG_HEADER, 0, reqData, 0, U.GG_HEADER.length)) { - U.error(log, "Failed to verify message header."); - - continue; - } - - try { - sock.send(new DatagramPacket(res.data(), res.data().length, pckt.getAddress(), pckt.getPort())); - } - catch (IOException e) { - if (e.getMessage().contains("Operation not permitted")) { - if (log.isDebugEnabled()) - log.debug("Got 'operation not permitted' error, ignoring: " + e); - } - else - throw e; - } - } - catch (IOException e) { - if (!isInterrupted()) { - U.error(log, "Failed to send/receive address message (will try to reconnect).", e); - - synchronized (this) { - U.close(sock); - - sock = null; - } - } - } - } - } - - /** {@inheritDoc} */ - @Override public void interrupt() { - super.interrupt(); - - synchronized (this) { - U.close(sock); - - sock = null; - } - } - - /** {@inheritDoc} */ - @Override protected void cleanup() { - synchronized (this) { - U.close(sock); - - sock = null; - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f31df32/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java new file mode 100644 index 0000000..acc20a4 --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java @@ -0,0 +1,752 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.spi.discovery.tcp.ipfinder.multicast; + +import org.apache.ignite.*; +import org.apache.ignite.marshaller.*; +import org.apache.ignite.marshaller.jdk.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.spi.*; +import org.gridgain.grid.*; +import org.gridgain.grid.spi.discovery.tcp.ipfinder.vm.*; +import org.gridgain.grid.util.typedef.*; +import org.gridgain.grid.util.typedef.internal.*; +import org.gridgain.grid.util.tostring.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.net.*; +import java.util.*; + +import static org.apache.ignite.IgniteSystemProperties.*; +import static org.apache.ignite.spi.IgnitePortProtocol.*; + +/** + * Multicast-based IP finder. + * <p> + * When TCP discovery starts this finder sends multicast request and waits + * for some time when others nodes reply to this request with messages containing + * their addresses (time IP finder waits for response and number of attempts to + * re-send multicast request in case if no replies are received can be configured, + * see {@link #setResponseWaitTime(int)} and {@link #setAddressRequestAttempts(int)}). + * <p> + * In addition to address received via multicast this finder can work with pre-configured + * list of addresses specified via {@link #setAddresses(Collection)} method. + * <h1 class="header">Configuration</h1> + * <h2 class="header">Mandatory</h2> + * There are no mandatory configuration parameters. + * <h2 class="header">Optional</h2> + * <ul> + * <li>Multicast IP address (see {@link #setMulticastGroup(String)}).</li> + * <li>Multicast port number (see {@link #setMulticastPort(int)}).</li> + * <li>Address response wait time (see {@link #setResponseWaitTime(int)}).</li> + * <li>Address request attempts (see {@link #setAddressRequestAttempts(int)}).</li> + * <li>Pre-configured addresses (see {@link #setAddresses(Collection)})</li> + * <li>Local address (see {@link #setLocalAddress(String)})</li> + * </ul> + */ +public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder { + /** Default multicast IP address (value is {@code 228.1.2.4}). */ + public static final String DFLT_MCAST_GROUP = "228.1.2.4"; + + /** Default multicast port number (value is {@code 47400}). */ + public static final int DFLT_MCAST_PORT = 47400; + + /** Default time IP finder waits for reply to multicast address request (value is {@code 500}). */ + public static final int DFLT_RES_WAIT_TIME = 500; + + /** Default number of attempts to send multicast address request (value is {@code 2}). */ + public static final int DFLT_ADDR_REQ_ATTEMPTS = 2; + + /** Address request message data. */ + private static final byte[] MSG_ADDR_REQ_DATA = U.GG_HEADER; + + /** */ + private static final IgniteMarshaller marsh = new IgniteJdkMarshaller(); + + /** Grid logger. */ + @IgniteLoggerResource + private IgniteLogger log; + + /** Grid name. */ + @IgniteNameResource + @GridToStringExclude + private String gridName; + + /** Multicast IP address as string. */ + private String mcastGrp = DFLT_MCAST_GROUP; + + /** Multicast port number. */ + private int mcastPort = DFLT_MCAST_PORT; + + /** Time IP finder waits for reply to multicast address request. */ + private int resWaitTime = DFLT_RES_WAIT_TIME; + + /** Number of attempts to send multicast address request. */ + private int addrReqAttempts = DFLT_ADDR_REQ_ATTEMPTS; + + /** Local address */ + private String locAddr; + + /** */ + @GridToStringExclude + private Collection<AddressSender> addrSnds; + + /** + * Constructs new IP finder. + */ + public TcpDiscoveryMulticastIpFinder() { + setShared(true); + } + + /** + * Sets IP address of multicast group. + * <p> + * If not provided, default value is {@link #DFLT_MCAST_GROUP}. + * + * @param mcastGrp Multicast IP address. + */ + @IgniteSpiConfiguration(optional = true) + public void setMulticastGroup(String mcastGrp) { + this.mcastGrp = mcastGrp; + } + + /** + * Gets IP address of multicast group. + * + * @return Multicast IP address. + */ + public String getMulticastGroup() { + return mcastGrp; + } + + /** + * Sets port number which multicast messages are sent to. + * <p> + * If not provided, default value is {@link #DFLT_MCAST_PORT}. + * + * @param mcastPort Multicast port number. + */ + @IgniteSpiConfiguration(optional = true) + public void setMulticastPort(int mcastPort) { + this.mcastPort = mcastPort; + } + + /** + * Gets port number which multicast messages are sent to. + * + * @return Port number. + */ + public int getMulticastPort() { + return mcastPort; + } + + /** + * Sets time in milliseconds IP finder waits for reply to + * multicast address request. + * <p> + * If not provided, default value is {@link #DFLT_RES_WAIT_TIME}. + * + * @param resWaitTime Time IP finder waits for reply to multicast address request. + */ + @IgniteSpiConfiguration(optional = true) + public void setResponseWaitTime(int resWaitTime) { + this.resWaitTime = resWaitTime; + } + + /** + * Gets time in milliseconds IP finder waits for reply to + * multicast address request. + * + * @return Time IP finder waits for reply to multicast address request. + */ + public int getResponseWaitTime() { + return resWaitTime; + } + + /** + * Sets number of attempts to send multicast address request. IP finder re-sends + * request only in case if no reply for previous request is received. + * <p> + * If not provided, default value is {@link #DFLT_ADDR_REQ_ATTEMPTS}. + * + * @param addrReqAttempts Number of attempts to send multicast address request. + */ + @IgniteSpiConfiguration(optional = true) + public void setAddressRequestAttempts(int addrReqAttempts) { + this.addrReqAttempts = addrReqAttempts; + } + + /** + * Gets number of attempts to send multicast address request. IP finder re-sends + * request only in case if no reply for previous request is received. + * + * @return Number of attempts to send multicast address request. + */ + public int getAddressRequestAttempts() { + return addrReqAttempts; + } + + /** + * Sets local host address used by this IP finder. If provided address is non-loopback then multicast + * socket is bound to this interface. If local address is not set or is any local address then IP finder + * creates multicast sockets for all found non-loopback addresses. + * <p> + * If not provided then this property is initialized by the local address set in {@link org.gridgain.grid.spi.discovery.tcp.TcpDiscoverySpi} + * configuration. + * + * @param locAddr Local host address. + * @see org.gridgain.grid.spi.discovery.tcp.TcpDiscoverySpi#setLocalAddress(String) + */ + @IgniteSpiConfiguration(optional = true) + public void setLocalAddress(String locAddr) { + this.locAddr = locAddr; + } + + /** + * Gets local address that multicast IP finder uses. + * + * @return Local address. + */ + public String getLocalAddress() { + return locAddr; + } + + /** {@inheritDoc} */ + @Override public void initializeLocalAddresses(Collection<InetSocketAddress> addrs) throws IgniteSpiException { + // If GRIDGAIN_OVERRIDE_MCAST_GRP system property is set, use its value to override multicast group from + // configuration. Used for testing purposes. + String overrideMcastGrp = System.getProperty(GG_OVERRIDE_MCAST_GRP); + + if (overrideMcastGrp != null) + mcastGrp = overrideMcastGrp; + + if (F.isEmpty(mcastGrp)) + throw new IgniteSpiException("Multicast IP address is not specified."); + + if (mcastPort < 0 || mcastPort > 65535) + throw new IgniteSpiException("Invalid multicast port: " + mcastPort); + + if (resWaitTime <= 0) + throw new IgniteSpiException("Invalid wait time, value greater than zero is expected: " + resWaitTime); + + if (addrReqAttempts <= 0) + throw new IgniteSpiException("Invalid number of address request attempts, " + + "value greater than zero is expected: " + addrReqAttempts); + + if (F.isEmpty(getRegisteredAddresses())) + U.warn(log, "GridTcpDiscoveryMulticastIpFinder has no pre-configured addresses " + + "(it is recommended in production to specify at least one address in " + + "GridTcpDiscoveryMulticastIpFinder.getAddresses() configuration property)"); + + InetAddress mcastAddr; + + try { + mcastAddr = InetAddress.getByName(mcastGrp); + } + catch (UnknownHostException e) { + throw new IgniteSpiException("Unknown multicast group: " + mcastGrp, e); + } + + if (!mcastAddr.isMulticastAddress()) + throw new IgniteSpiException("Invalid multicast group address: " + mcastAddr); + + Collection<String> locAddrs; + + try { + locAddrs = U.resolveLocalAddresses(U.resolveLocalHost(locAddr)).get1(); + } + catch (IOException | GridException e) { + throw new IgniteSpiException("Failed to resolve local addresses [locAddr=" + locAddr + ']', e); + } + + assert locAddrs != null; + + addrSnds = new ArrayList<>(locAddrs.size()); + + Collection<InetAddress> reqItfs = new ArrayList<>(locAddrs.size()); // Interfaces used to send requests. + + for (String locAddr : locAddrs) { + InetAddress addr; + + try { + addr = InetAddress.getByName(locAddr); + } + catch (UnknownHostException e) { + if (log.isDebugEnabled()) + log.debug("Failed to resolve local address [locAddr=" + locAddr + ", err=" + e + ']'); + + continue; + } + + if (!addr.isLoopbackAddress()) { + try { + addrSnds.add(new AddressSender(mcastAddr, addr, addrs)); + + reqItfs.add(addr); + } + catch (IOException e) { + if (log.isDebugEnabled()) + log.debug("Failed to create multicast socket [mcastAddr=" + mcastAddr + + ", mcastGrp=" + mcastGrp + ", mcastPort=" + mcastPort + ", locAddr=" + addr + + ", err=" + e + ']'); + } + } + } + + if (addrSnds.isEmpty()) { + try { + // Create non-bound socket if local host is loopback or failed to create sockets explicitly + // bound to interfaces. + addrSnds.add(new AddressSender(mcastAddr, null, addrs)); + } + catch (IOException e) { + throw new IgniteSpiException("Failed to create multicast socket [mcastAddr=" + mcastAddr + + ", mcastGrp=" + mcastGrp + ", mcastPort=" + mcastPort + ']', e); + } + } + + for (AddressSender addrSnd :addrSnds) + addrSnd.start(); + + Collection<InetSocketAddress> ret; + + if (reqItfs.size() > 1) { + ret = new HashSet<>(); + + Collection<AddressReceiver> rcvrs = new ArrayList<>(); + + for (InetAddress itf : reqItfs) { + AddressReceiver rcvr = new AddressReceiver(mcastAddr, itf); + + rcvr.start(); + + rcvrs.add(rcvr); + } + + for (AddressReceiver rcvr : rcvrs) { + try { + rcvr.join(); + + ret.addAll(rcvr.addresses()); + } + catch (InterruptedException ignore) { + U.warn(log, "Got interrupted while receiving address request."); + + Thread.currentThread().interrupt(); + + break; + } + } + } + else + ret = requestAddresses(mcastAddr, F.first(reqItfs)); + + if (!ret.isEmpty()) + registerAddresses(ret); + } + + /** {@inheritDoc} */ + @Override public void onSpiContextInitialized(IgniteSpiContext spiCtx) throws IgniteSpiException { + super.onSpiContextInitialized(spiCtx); + + spiCtx.registerPort(mcastPort, UDP); + } + + /** + * Sends multicast address request message and waits for reply. Response wait time and number + * of request attempts are configured as properties {@link #setResponseWaitTime} and + * {@link #setAddressRequestAttempts}. + * + * @param mcastAddr Multicast address where to send request. + * @param sockItf Optional interface multicast socket should be bound to. + * @return Collection of received addresses. + */ + private Collection<InetSocketAddress> requestAddresses(InetAddress mcastAddr, @Nullable InetAddress sockItf) { + Collection<InetSocketAddress> rmtAddrs = new HashSet<>(); + + try { + DatagramPacket reqPckt = new DatagramPacket(MSG_ADDR_REQ_DATA, MSG_ADDR_REQ_DATA.length, + mcastAddr, mcastPort); + + byte[] resData = new byte[AddressResponse.MAX_DATA_LENGTH]; + + DatagramPacket resPckt = new DatagramPacket(resData, resData.length); + + boolean sndError = false; + + for (int i = 0; i < addrReqAttempts; i++) { + MulticastSocket sock = null; + + try { + sock = new MulticastSocket(0); + + // Use 'false' to enable support for more than one node on the same machine. + sock.setLoopbackMode(false); + + if (sockItf != null) + sock.setInterface(sockItf); + + sock.setSoTimeout(resWaitTime); + + reqPckt.setData(MSG_ADDR_REQ_DATA); + + try { + sock.send(reqPckt); + } + catch (IOException e) { + if (!handleNetworkError(e)) + break; + + if (i < addrReqAttempts - 1) { + if (log.isDebugEnabled()) + log.debug("Failed to send multicast address request (will retry in 500 ms): " + e); + + U.sleep(500); + } + else { + if (log.isDebugEnabled()) + log.debug("Failed to send multicast address request: " + e); + } + + sndError = true; + + continue; + } + + long rcvEnd = U.currentTimeMillis() + resWaitTime; + + try { + while (U.currentTimeMillis() < rcvEnd) { // Try to receive multiple responses. + sock.receive(resPckt); + + byte[] data = resPckt.getData(); + + if (!U.bytesEqual(U.GG_HEADER, 0, data, 0, U.GG_HEADER.length)) { + U.error(log, "Failed to verify message header."); + + continue; + } + + AddressResponse addrRes; + + try { + addrRes = new AddressResponse(data); + } + catch (GridException e) { + LT.warn(log, e, "Failed to deserialize multicast response."); + + continue; + } + + rmtAddrs.addAll(addrRes.addresses()); + } + } + catch (SocketTimeoutException ignored) { + if (log.isDebugEnabled()) // DatagramSocket.receive timeout has expired. + log.debug("Address receive timeout."); + } + } + catch (IOException e) { + U.error(log, "Failed to request nodes addresses.", e); + } + finally { + U.close(sock); + } + + if (!rmtAddrs.isEmpty()) + break; + + if (i < addrReqAttempts - 1) // Wait some time before re-sending address request. + U.sleep(200); + } + + if (log.isDebugEnabled()) + log.debug("Received nodes addresses: " + rmtAddrs); + + if (rmtAddrs.isEmpty() && sndError) + U.quietAndWarn(log, "Failed to send multicast message (is multicast enabled on this node?)."); + + return rmtAddrs; + } + catch (GridInterruptedException ignored) { + U.warn(log, "Got interrupted while sending address request."); + + Thread.currentThread().interrupt(); + + return rmtAddrs; + } + } + + /** {@inheritDoc} */ + @Override public void close() { + for (AddressSender addrSnd : addrSnds) + U.interrupt(addrSnd); + + for (AddressSender addrSnd : addrSnds) + U.join(addrSnd, log); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TcpDiscoveryMulticastIpFinder.class, this, "super", super.toString()); + } + + /** + * @param e Network error to handle. + * @return {@code True} if this error is recoverable and the operation can be retried. + */ + private boolean handleNetworkError(IOException e) { + if ("Network is unreachable".equals(e.getMessage()) && U.isMacOs()) { + U.warn(log, "Multicast does not work on Mac OS JVM loopback address (configure external IP address " + + "for 'localHost' configuration property)"); + + return false; + } + + return true; + } + + /** + * Response to multicast address request. + */ + private static class AddressResponse { + /** Maximum supported multicast message. */ + public static final int MAX_DATA_LENGTH = 64 * 1024; + + /** */ + private byte[] data; + + /** */ + private Collection<InetSocketAddress> addrs; + + /** + * @param addrs Addresses discovery SPI binds to. + * @throws GridException If marshalling failed. + */ + private AddressResponse(Collection<InetSocketAddress> addrs) throws GridException { + this.addrs = addrs; + + byte[] addrsData = marsh.marshal(addrs); + data = new byte[U.GG_HEADER.length + addrsData.length]; + + if (data.length > MAX_DATA_LENGTH) + throw new GridException("Too long data packet [size=" + data.length + ", max=" + MAX_DATA_LENGTH + "]"); + + System.arraycopy(U.GG_HEADER, 0, data, 0, U.GG_HEADER.length); + System.arraycopy(addrsData, 0, data, 4, addrsData.length); + } + + /** + * @param data Message data. + * @throws GridException If unmarshalling failed. + */ + private AddressResponse(byte[] data) throws GridException { + assert U.bytesEqual(U.GG_HEADER, 0, data, 0, U.GG_HEADER.length); + + this.data = data; + + addrs = marsh.unmarshal(Arrays.copyOfRange(data, U.GG_HEADER.length, data.length), null); + } + + /** + * @return Message data. + */ + byte[] data() { + return data; + } + + /** + * @return IP address discovery SPI binds to. + */ + public Collection<InetSocketAddress> addresses() { + return addrs; + } + } + + /** + * Thread sends multicast address request message and waits for reply. + */ + private class AddressReceiver extends IgniteSpiThread { + /** */ + private final InetAddress mcastAddr; + + /** */ + private final InetAddress sockAddr; + + /** */ + private Collection<InetSocketAddress> addrs; + + /** + * @param mcastAddr Multicast address where to send request. + * @param sockAddr Optional address multicast socket should be bound to. + */ + private AddressReceiver(InetAddress mcastAddr, InetAddress sockAddr) { + super(gridName, "tcp-disco-multicast-addr-rcvr", log); + this.mcastAddr = mcastAddr; + this.sockAddr = sockAddr; + } + + /** {@inheritDoc} */ + @Override protected void body() throws InterruptedException { + addrs = requestAddresses(mcastAddr, sockAddr); + } + + /** + * @return Received addresses. + */ + Collection<InetSocketAddress> addresses() { + return addrs; + } + } + + /** + * Thread listening for multicast address requests and sending response + * containing socket address this node's discovery SPI listens to. + */ + private class AddressSender extends IgniteSpiThread { + /** */ + @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") + private MulticastSocket sock; + + /** */ + private final InetAddress mcastGrp; + + /** */ + private final Collection<InetSocketAddress> addrs; + + /** */ + private final InetAddress sockItf; + + /** + * @param mcastGrp Multicast address. + * @param sockItf Optional interface multicast socket should be bound to. + * @param addrs Local node addresses. + * @throws IOException If fails to create multicast socket. + */ + private AddressSender(InetAddress mcastGrp, @Nullable InetAddress sockItf, Collection<InetSocketAddress> addrs) + throws IOException { + super(gridName, "tcp-disco-multicast-addr-sender", log); + this.mcastGrp = mcastGrp; + this.addrs = addrs; + this.sockItf = sockItf; + + sock = createSocket(); + } + + /** + * Creates multicast socket and joins multicast group. + * + * @throws IOException If fails to create socket or join multicast group. + * @return Multicast socket. + */ + private MulticastSocket createSocket() throws IOException { + MulticastSocket sock = new MulticastSocket(mcastPort); + + sock.setLoopbackMode(false); // Use 'false' to enable support for more than one node on the same machine. + + if (sockItf != null) + sock.setInterface(sockItf); + + if (sock.getLoopbackMode()) + U.warn(log, "Loopback mode is disabled which prevents nodes on the same machine from discovering " + + "each other."); + + sock.joinGroup(mcastGrp); + + return sock; + } + + /** {@inheritDoc} */ + @Override protected void body() throws InterruptedException { + AddressResponse res; + + try { + res = new AddressResponse(addrs); + } + catch (GridException e) { + U.error(log, "Failed to prepare multicast message.", e); + + return; + } + + byte[] reqData = new byte[MSG_ADDR_REQ_DATA.length]; + + DatagramPacket pckt = new DatagramPacket(reqData, reqData.length); + + while (!isInterrupted()) { + try { + MulticastSocket sock; + + synchronized (this) { + if (isInterrupted()) + return; + + sock = this.sock; + + if (sock == null) + sock = createSocket(); + } + + sock.receive(pckt); + + if (!U.bytesEqual(U.GG_HEADER, 0, reqData, 0, U.GG_HEADER.length)) { + U.error(log, "Failed to verify message header."); + + continue; + } + + try { + sock.send(new DatagramPacket(res.data(), res.data().length, pckt.getAddress(), pckt.getPort())); + } + catch (IOException e) { + if (e.getMessage().contains("Operation not permitted")) { + if (log.isDebugEnabled()) + log.debug("Got 'operation not permitted' error, ignoring: " + e); + } + else + throw e; + } + } + catch (IOException e) { + if (!isInterrupted()) { + U.error(log, "Failed to send/receive address message (will try to reconnect).", e); + + synchronized (this) { + U.close(sock); + + sock = null; + } + } + } + } + } + + /** {@inheritDoc} */ + @Override public void interrupt() { + super.interrupt(); + + synchronized (this) { + U.close(sock); + + sock = null; + } + } + + /** {@inheritDoc} */ + @Override protected void cleanup() { + synchronized (this) { + U.close(sock); + + sock = null; + } + } + } +}