http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dae4b942/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioServer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioServer.java b/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioServer.java index 1e61a17..ddd301b 100644 --- a/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioServer.java +++ b/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioServer.java @@ -11,6 +11,7 @@ package org.gridgain.grid.util.nio; import org.apache.ignite.*; import org.apache.ignite.configuration.*; +import org.apache.ignite.lang.*; import org.apache.ignite.thread.*; import org.gridgain.grid.*; import org.gridgain.grid.util.*; @@ -329,7 +330,7 @@ public class GridNioServer<T> { NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, NioOperation.REQUIRE_WRITE, msg); - send0(impl, fut); + send0(impl, fut, false); return fut; } @@ -346,7 +347,7 @@ public class GridNioServer<T> { NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, NioOperation.REQUIRE_WRITE, msg); - send0(impl, fut); + send0(impl, fut, false); return fut; } @@ -354,19 +355,17 @@ public class GridNioServer<T> { /** * @param ses Session. * @param fut Future. + * @param sys System message flag. */ - private void send0(GridSelectorNioSessionImpl ses, NioOperationFuture<?> fut) { + private void send0(GridSelectorNioSessionImpl ses, NioOperationFuture<?> fut, boolean sys) { assert ses != null; assert fut != null; - int msgCnt = ses.offerFuture(fut); + int msgCnt = sys ? ses.offerSystemFuture(fut) : ses.offerFuture(fut); if (ses.closed()) { - NioOperationFuture<?> fut0; - - // Cleanup as session.close() may have been already finished. - while ((fut0 = (NioOperationFuture<?>)ses.pollFuture()) != null) - fut0.connectionClosed(); + if (ses.removeFuture(fut)) + fut.connectionClosed(); } else if (msgCnt == 1) // Change from 0 to 1 means that worker thread should be waken up. @@ -374,6 +373,76 @@ public class GridNioServer<T> { } /** + * Adds message at the front of the queue without acquiring back pressure semaphore. + * + * @param ses Session. + * @param msg Message. + * @return Future. + */ + public GridNioFuture<?> sendSystem(GridNioSession ses, GridTcpCommunicationMessageAdapter msg) { + return sendSystem(ses, msg, null); + } + + /** + * Adds message at the front of the queue without acquiring back pressure semaphore. + * + * @param ses Session. + * @param msg Message. + * @param lsnr Future listener notified from the session thread. + * @return Future. + */ + public GridNioFuture<?> sendSystem(GridNioSession ses, + GridTcpCommunicationMessageAdapter msg, + @Nullable IgniteInClosure<? super GridNioFuture<?>> lsnr) { + assert ses instanceof GridSelectorNioSessionImpl; + + GridSelectorNioSessionImpl impl = (GridSelectorNioSessionImpl)ses; + + NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, NioOperation.REQUIRE_WRITE, msg); + + if (lsnr != null) { + fut.listenAsync(lsnr); + + assert !fut.isDone(); + } + + send0(impl, fut, true); + + return fut; + } + + /** + * @param ses Session. + */ + public void resend(GridNioSession ses) { + assert ses instanceof GridSelectorNioSessionImpl; + + GridNioRecoveryDescriptor recoveryDesc = ses.recoveryDescriptor(); + + if (recoveryDesc != null && !recoveryDesc.messagesFutures().isEmpty()) { + Deque<GridNioFuture<?>> futs = recoveryDesc.messagesFutures(); + + if (log.isDebugEnabled()) + log.debug("Resend messages [rmtNode=" + recoveryDesc.node().id() + ", msgCnt=" + futs.size() + ']'); + + GridSelectorNioSessionImpl ses0 = (GridSelectorNioSessionImpl)ses; + + GridNioFuture<?> fut0 = futs.iterator().next(); + + for (GridNioFuture<?> fut : futs) { + fut.messageThread(true); + + ((NioOperationFuture)fut).resetMessage(ses0); + } + + ses0.resend(futs); + + // Wake up worker. + clientWorkers.get(ses0.selectorIndex()).offer(((NioOperationFuture)fut0)); + } + } + + /** * @param ses Session. * @param op Operation. * @return Future for operation. @@ -385,7 +454,8 @@ public class GridNioServer<T> { GridSelectorNioSessionImpl impl = (GridSelectorNioSessionImpl)ses; if (impl.closed()) - return new GridNioFinishedFuture(new IOException("Failed to send message (connection was closed): " + ses)); + return new GridNioFinishedFuture(new IOException("Failed to pause/resume reads " + + "(connection was closed): " + ses)); NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, op); @@ -406,7 +476,7 @@ public class GridNioServer<T> { try { ch.configureBlocking(false); - NioOperationFuture<GridNioSession> req = new NioOperationFuture<>(ch, NioOperation.REGISTER, false, meta); + NioOperationFuture<GridNioSession> req = new NioOperationFuture<>(ch, false, meta); offerBalanced(req); @@ -437,7 +507,7 @@ public class GridNioServer<T> { /** * Gets configurable idle timeout for this session. If not set, default value is - * {@link org.apache.ignite.configuration.IgniteConfiguration#DFLT_REST_IDLE_TIMEOUT}. + * {@link IgniteConfiguration#DFLT_REST_IDLE_TIMEOUT}. * * @return Idle timeout in milliseconds. */ @@ -1313,9 +1383,16 @@ public class GridNioServer<T> { readBuf.order(order); } - final GridSelectorNioSessionImpl ses = new GridSelectorNioSessionImpl(idx, filterChain, - (InetSocketAddress)sockCh.getLocalAddress(), (InetSocketAddress)sockCh.getRemoteAddress(), - req.accepted(), sndQueueLimit, writeBuf, readBuf); + final GridSelectorNioSessionImpl ses = new GridSelectorNioSessionImpl( + log, + idx, + filterChain, + (InetSocketAddress)sockCh.getLocalAddress(), + (InetSocketAddress)sockCh.getRemoteAddress(), + req.accepted(), + sndQueueLimit, + writeBuf, + readBuf); Map<Integer, ?> meta = req.meta(); @@ -1328,6 +1405,9 @@ public class GridNioServer<T> { ses.key(key); + if (!ses.accepted()) + resend(ses); + sessions.add(ses); try { @@ -1418,11 +1498,27 @@ public class GridNioServer<T> { // Since ses is in closed state, no write requests will be added. NioOperationFuture<?> fut = ses.removeMeta(NIO_OPERATION.ordinal()); - if (fut != null) - fut.connectionClosed(); + GridNioRecoveryDescriptor recovery = ses.recoveryDescriptor(); - while ((fut = (NioOperationFuture<?>)ses.pollFuture()) != null) - fut.connectionClosed(); + if (recovery != null) { + try { + // Poll will update recovery data. + while ((fut = (NioOperationFuture<?>)ses.pollFuture()) != null) { + if (fut.skipRecovery()) + fut.connectionClosed(); + } + } + finally { + recovery.release(); + } + } + else { + if (fut != null) + fut.connectionClosed(); + + while ((fut = (NioOperationFuture<?>)ses.pollFuture()) != null) + fut.connectionClosed(); + } return true; } @@ -1669,19 +1765,22 @@ public class GridNioServer<T> { * @param sockCh Socket channel to register on selector. */ NioOperationFuture(SocketChannel sockCh) { - this(sockCh, NioOperation.REGISTER, true, null); + this(sockCh, true, null); } /** * @param sockCh Socket channel. - * @param op Operation. * @param accepted {@code True} if socket has been accepted. * @param meta Optional meta. */ - NioOperationFuture(SocketChannel sockCh, NioOperation op, boolean accepted, - @Nullable Map<Integer, ?> meta) { + NioOperationFuture( + SocketChannel sockCh, + boolean accepted, + @Nullable Map<Integer, ?> meta + ) { + op = NioOperation.REGISTER; + this.sockCh = sockCh; - this.op = op; this.accepted = accepted; this.meta = meta; } @@ -1761,6 +1860,17 @@ public class GridNioServer<T> { } /** + * @param ses New session instance. + */ + private void resetMessage(GridSelectorNioSessionImpl ses) { + assert commMsg != null; + + commMsg = commMsg.clone(); + + this.ses = ses; + } + + /** * @return Socket channel for register request. */ private SocketChannel socketChannel() { @@ -1799,6 +1909,11 @@ public class GridNioServer<T> { } /** {@inheritDoc} */ + @Override public boolean skipRecovery() { + return commMsg != null && commMsg.skipRecovery(); + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(NioOperationFuture.class, this); } @@ -1836,9 +1951,9 @@ public class GridNioServer<T> { /** {@inheritDoc} */ @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg) { if (directMode) { - boolean sslSystem = sslFilter != null && msg instanceof ByteBuffer; + boolean sslSys = sslFilter != null && msg instanceof ByteBuffer; - if (sslSystem) { + if (sslSys) { ConcurrentLinkedDeque8<ByteBuffer> queue = ses.meta(BUF_SSL_SYSTEM_META_KEY); queue.offer((ByteBuffer)msg);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dae4b942/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioSession.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioSession.java b/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioSession.java index e1600ba..27acfcc 100644 --- a/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioSession.java +++ b/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioSession.java @@ -146,4 +146,14 @@ public interface GridNioSession { * @return {@code True} if reads are paused. */ public boolean readsPaused(); + + /** + * @param recoveryDesc Recovery descriptor. + */ + public void recoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc); + + /** + * @return Recovery descriptor if recovery is supported, {@code null otherwise.} + */ + @Nullable public GridNioRecoveryDescriptor recoveryDescriptor(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dae4b942/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioSessionImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioSessionImpl.java b/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioSessionImpl.java index d81f98f..347cb72 100644 --- a/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioSessionImpl.java +++ b/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioSessionImpl.java @@ -288,6 +288,16 @@ public class GridNioSessionImpl implements GridNioSession { } /** {@inheritDoc} */ + @Override public void recoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc) { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Nullable @Override public GridNioRecoveryDescriptor recoveryDescriptor() { + return null; + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridNioSessionImpl.class, this); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dae4b942/modules/core/src/main/java/org/gridgain/grid/util/nio/GridSelectorNioSessionImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/util/nio/GridSelectorNioSessionImpl.java b/modules/core/src/main/java/org/gridgain/grid/util/nio/GridSelectorNioSessionImpl.java index c44c6a9..db772dc 100644 --- a/modules/core/src/main/java/org/gridgain/grid/util/nio/GridSelectorNioSessionImpl.java +++ b/modules/core/src/main/java/org/gridgain/grid/util/nio/GridSelectorNioSessionImpl.java @@ -9,6 +9,7 @@ package org.gridgain.grid.util.nio; +import org.apache.ignite.*; import org.gridgain.grid.util.typedef.internal.*; import org.gridgain.grid.util.tostring.*; import org.jdk8.backport.*; @@ -17,6 +18,7 @@ import org.jetbrains.annotations.*; import java.net.*; import java.nio.*; import java.nio.channels.*; +import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; @@ -49,9 +51,16 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl { /** Read buffer. */ private ByteBuffer readBuf; + /** Recovery data. */ + private GridNioRecoveryDescriptor recovery; + + /** Logger. */ + private final IgniteLogger log; + /** * Creates session instance. * + * @param log Logger. * @param selectorIdx Selector index for this session. * @param filterChain Filter chain that will handle requests. * @param locAddr Local address. @@ -62,6 +71,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl { * @param readBuf Read buffer. */ GridSelectorNioSessionImpl( + IgniteLogger log, int selectorIdx, GridNioFilterChain filterChain, InetSocketAddress locAddr, @@ -79,6 +89,10 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl { assert locAddr != null : "GridSelectorNioSessionImpl should have local socket address."; assert rmtAddr != null : "GridSelectorNioSessionImpl should have remote socket address."; + assert log != null; + + this.log = log; + this.selectorIdx = selectorIdx; sem = sndQueueLimit > 0 ? new Semaphore(sndQueueLimit) : null; @@ -136,6 +150,22 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl { } /** + * Adds write future at the front of the queue without acquiring back pressure semaphore. + * + * @param writeFut Write request. + * @return Updated size of the queue. + */ + int offerSystemFuture(GridNioFuture<?> writeFut) { + writeFut.messageThread(true); + + boolean res = queue.offerFirst(writeFut); + + assert res : "Future was not added to queue"; + + return queueSize.incrementAndGet(); + } + + /** * Adds write future to the pending list and returns the size of the queue. * <p> * Note that separate counter for the queue size is needed because in case of concurrent @@ -161,6 +191,21 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl { } /** + * @param futs Futures to resend. + */ + void resend(Collection<GridNioFuture<?>> futs) { + assert queue.isEmpty() : queue.size(); + + boolean add = queue.addAll(futs); + + assert add; + + boolean set = queueSize.compareAndSet(0, futs.size()); + + assert set; + } + + /** * @return Message that is in the head of the queue, {@code null} if queue is empty. */ @Nullable GridNioFuture<?> pollFuture() { @@ -171,12 +216,38 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl { if (sem != null && !last.messageThread()) sem.release(); + + if (recovery != null) { + if (!recovery.add(last)) { + LT.warn(log, null, "Unacknowledged messages queue size overflow, will attempt to reconnect " + + "[remoteAddr=" + remoteAddress() + + ", queueLimit=" + recovery.queueLimit() + ']'); + + if (log.isDebugEnabled()) + log.debug("Unacknowledged messages queue size overflow, will attempt to reconnect " + + "[remoteAddr=" + remoteAddress() + + ", queueSize=" + recovery.messagesFutures().size() + + ", queueLimit=" + recovery.queueLimit() + ']'); + + close(); + } + } } return last; } /** + * @param fut Future. + * @return {@code True} if future was removed from queue. + */ + boolean removeFuture(GridNioFuture<?> fut) { + assert closed(); + + return queue.removeLastOccurrence(fut); + } + + /** * Gets number of write requests in a queue that have not been processed yet. * * @return Number of write requests. @@ -186,6 +257,32 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl { } /** {@inheritDoc} */ + @Override public void recoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc) { + assert recoveryDesc != null; + + recovery = recoveryDesc; + } + + /** {@inheritDoc} */ + @Nullable @Override public GridNioRecoveryDescriptor recoveryDescriptor() { + return recovery; + } + + /** {@inheritDoc} */ + @Override public <T> T addMeta(int key, @Nullable T val) { + if (val instanceof GridNioRecoveryDescriptor) { + recovery = (GridNioRecoveryDescriptor)val; + + if (!accepted()) + recovery.connected(); + + return null; + } + else + return super.addMeta(key, val); + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridSelectorNioSessionImpl.class, this, super.toString()); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dae4b942/modules/core/src/main/java/org/gridgain/grid/util/nio/GridShmemCommunicationClient.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/util/nio/GridShmemCommunicationClient.java b/modules/core/src/main/java/org/gridgain/grid/util/nio/GridShmemCommunicationClient.java index d5dec86..505c788 100644 --- a/modules/core/src/main/java/org/gridgain/grid/util/nio/GridShmemCommunicationClient.java +++ b/modules/core/src/main/java/org/gridgain/grid/util/nio/GridShmemCommunicationClient.java @@ -103,7 +103,7 @@ public class GridShmemCommunicationClient extends GridAbstractCommunicationClien } /** {@inheritDoc} */ - @Override public synchronized void sendMessage(@Nullable UUID nodeId, GridTcpCommunicationMessageAdapter msg) + @Override public synchronized boolean sendMessage(@Nullable UUID nodeId, GridTcpCommunicationMessageAdapter msg) throws GridException { if (closed()) throw new GridException("Communication client was closed: " + this); @@ -120,6 +120,8 @@ public class GridShmemCommunicationClient extends GridAbstractCommunicationClien } markUsed(); + + return false; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dae4b942/modules/core/src/main/java/org/gridgain/grid/util/nio/GridTcpCommunicationClient.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/util/nio/GridTcpCommunicationClient.java b/modules/core/src/main/java/org/gridgain/grid/util/nio/GridTcpCommunicationClient.java index d1c8c5e..fbca363 100644 --- a/modules/core/src/main/java/org/gridgain/grid/util/nio/GridTcpCommunicationClient.java +++ b/modules/core/src/main/java/org/gridgain/grid/util/nio/GridTcpCommunicationClient.java @@ -182,7 +182,7 @@ public class GridTcpCommunicationClient extends GridAbstractCommunicationClient } /** {@inheritDoc} */ - @Override public void sendMessage(@Nullable UUID nodeId, GridTcpCommunicationMessageAdapter msg) + @Override public boolean sendMessage(@Nullable UUID nodeId, GridTcpCommunicationMessageAdapter msg) throws GridException { if (closed()) throw new GridException("Client was closed: " + this); @@ -199,6 +199,8 @@ public class GridTcpCommunicationClient extends GridAbstractCommunicationClient } markUsed(); + + return false; } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dae4b942/modules/core/src/main/java/org/gridgain/grid/util/nio/GridTcpNioCommunicationClient.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/util/nio/GridTcpNioCommunicationClient.java b/modules/core/src/main/java/org/gridgain/grid/util/nio/GridTcpNioCommunicationClient.java index 6d2a8f7..55997d3 100644 --- a/modules/core/src/main/java/org/gridgain/grid/util/nio/GridTcpNioCommunicationClient.java +++ b/modules/core/src/main/java/org/gridgain/grid/util/nio/GridTcpNioCommunicationClient.java @@ -9,6 +9,7 @@ package org.gridgain.grid.util.nio; +import org.apache.ignite.*; import org.gridgain.grid.*; import org.gridgain.grid.util.direct.*; import org.jetbrains.annotations.*; @@ -23,27 +24,24 @@ import java.util.*; * Grid client for NIO server. */ public class GridTcpNioCommunicationClient extends GridAbstractCommunicationClient { - /** Socket. */ + /** Session. */ private final GridNioSession ses; - /** - * Constructor for test purposes only. - */ - public GridTcpNioCommunicationClient() { - super(null); - - ses = null; - } + /** Logger. */ + private final IgniteLogger log; /** * @param ses Session. + * @param log Logger. */ - public GridTcpNioCommunicationClient(GridNioSession ses) { + public GridTcpNioCommunicationClient(GridNioSession ses, IgniteLogger log) { super(null); assert ses != null; + assert log != null; this.ses = ses; + this.log = log; } /** @@ -98,14 +96,11 @@ public class GridTcpNioCommunicationClient extends GridAbstractCommunicationClie } /** {@inheritDoc} */ - @Override public void sendMessage(@Nullable UUID nodeId, GridTcpCommunicationMessageAdapter msg) + @Override public boolean sendMessage(@Nullable UUID nodeId, GridTcpCommunicationMessageAdapter msg) throws GridException { // Node ID is never provided in asynchronous send mode. assert nodeId == null; - if (closed()) - throw new GridException("Client was closed: " + this); - GridNioFuture<?> fut = ses.send(msg); if (fut.isDone()) { @@ -113,9 +108,23 @@ public class GridTcpNioCommunicationClient extends GridAbstractCommunicationClie fut.get(); } catch (IOException e) { - throw new GridException("Failed to send message [client=" + this + ']', e); + if (log.isDebugEnabled()) + log.debug("Failed to send message [client=" + this + ", err=" +e + ']'); + + return true; + } + catch (GridException e) { + if (log.isDebugEnabled()) + log.debug("Failed to send message [client=" + this + ", err=" +e + ']'); + + if (e.getCause() instanceof IOException) + return true; + else + throw new GridException("Failed to send message [client=" + this + ']', e); } } + + return false; } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dae4b942/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java index dc2ee2a..86d68a7 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java @@ -9,20 +9,22 @@ package org.apache.ignite.spi.communication; -import mx4j.tools.adaptor.http.*; import org.apache.ignite.cluster.*; import org.apache.ignite.lang.*; import org.gridgain.grid.*; import org.gridgain.grid.util.direct.*; +import org.gridgain.grid.util.typedef.*; +import org.gridgain.grid.util.typedef.internal.*; import org.gridgain.testframework.*; -import org.gridgain.testframework.config.*; import org.gridgain.testframework.junits.*; import org.gridgain.testframework.junits.spi.*; -import javax.management.*; +import java.net.*; import java.util.*; import java.util.Map.*; +import static org.gridgain.grid.kernal.GridNodeAttributes.*; + /** * Super class for all communication self tests. * @param <T> Type of communication SPI. @@ -47,22 +49,15 @@ public abstract class GridAbstractCommunicationSelfTest<T extends CommunicationS /** */ private static final Object mux = new Object(); - /** */ - private static final ObjectName mBeanName; - + /** + * + */ static { GridTcpCommunicationMessageFactory.registerCustom(new GridTcpCommunicationMessageProducer() { @Override public GridTcpCommunicationMessageAdapter create(byte type) { return new GridTestMessage(); } }, GridTestMessage.DIRECT_TYPE); - - try { - mBeanName = new ObjectName("mbeanAdaptor:protocol=HTTP"); - } - catch (MalformedObjectNameException e) { - throw new GridRuntimeException(e); - } } /** */ @@ -237,6 +232,36 @@ public abstract class GridAbstractCommunicationSelfTest<T extends CommunicationS /** {@inheritDoc} */ @Override protected void beforeTestsStarted() throws Exception { + for (int i = 0; i < 3; i++) { + try { + startSpis(); + + break; + } + catch (GridException e) { + if (e.hasCause(BindException.class)) { + if (i < 2) { + info("Failed to start SPIs because of BindException, will retry after delay."); + + afterTestsStopped(); + + U.sleep(30_000); + } + else + throw e; + } + else + throw e; + } + } + } + + /** + * @throws Exception If failed. + */ + private void startSpis() throws Exception { + U.setWorkDirectory(null, U.getGridGainHome()); + spis.clear(); nodes.clear(); spiRsrcs.clear(); @@ -246,10 +271,14 @@ public abstract class GridAbstractCommunicationSelfTest<T extends CommunicationS for (int i = 0; i < getSpiCount(); i++) { CommunicationSpi<GridTcpCommunicationMessageAdapter> spi = getSpi(i); - GridTestResources rsrcs = new GridTestResources(getMBeanServer(i)); + GridTestUtils.setFieldValue(spi, "gridName", "grid-" + i); + + GridTestResources rsrcs = new GridTestResources(); GridTestNode node = new GridTestNode(rsrcs.getNodeId()); + node.order(i); + GridSpiTestContext ctx = initSpiContext(); ctx.setLocalNode(node); @@ -263,6 +292,7 @@ public abstract class GridAbstractCommunicationSelfTest<T extends CommunicationS spi.setListener(new MessageListener(rsrcs.getNodeId())); node.setAttributes(spi.getNodeAttributes()); + node.setAttribute(ATTR_MACS, F.concat(U.allLocalMACs(), ", ")); nodes.add(node); @@ -284,38 +314,17 @@ public abstract class GridAbstractCommunicationSelfTest<T extends CommunicationS } } - /** - * @param idx Node index. - * @return Configured MBean server. - * @throws Exception If failed. - */ - private MBeanServer getMBeanServer(int idx) throws Exception { - HttpAdaptor mbeanAdaptor = new HttpAdaptor(); - - MBeanServer mbeanSrv = MBeanServerFactory.createMBeanServer(); - - mbeanAdaptor.setPort( - Integer.valueOf(GridTestProperties.getProperty("comm.mbeanserver.selftest.baseport")) + idx); - - mbeanSrv.registerMBean(mbeanAdaptor, mBeanName); - - mbeanAdaptor.start(); - - return mbeanSrv; - } - /** {@inheritDoc} */ @Override protected void afterTestsStopped() throws Exception { for (CommunicationSpi<GridTcpCommunicationMessageAdapter> spi : spis.values()) { + spi.onContextDestroyed(); + spi.setListener(null); spi.spiStop(); } - for (GridTestResources rsrcs : spiRsrcs) { + for (GridTestResources rsrcs : spiRsrcs) rsrcs.stopThreads(); - - rsrcs.getMBeanServer().unregisterMBean(mBeanName); - } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dae4b942/modules/core/src/test/java/org/apache/ignite/spi/communication/GridTestMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridTestMessage.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridTestMessage.java index 8015176..58cb184 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridTestMessage.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridTestMessage.java @@ -86,14 +86,23 @@ public class GridTestMessage extends GridTcpCommunicationMessageAdapter { } /** {@inheritDoc} */ - @SuppressWarnings("CloneDoesntCallSuperClone") + @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"}) @Override public GridTcpCommunicationMessageAdapter clone() { - throw new UnsupportedOperationException(); + GridTestMessage msg = new GridTestMessage(); + + clone0(msg); + + return msg; } /** {@inheritDoc} */ @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { - // No-op. + GridTestMessage _clone = (GridTestMessage)_msg; + + _clone.srcNodeId = srcNodeId; + _clone.msgId = msgId; + _clone.resId = resId; + _clone.payload = payload; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dae4b942/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java index 16c7ea0..1dabf5d 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java @@ -29,9 +29,6 @@ abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunica public static final int IDLE_CONN_TIMEOUT = 2000; /** */ - private boolean tcpNoDelay; - - /** */ private final boolean useShmem; /** @@ -50,11 +47,18 @@ abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunica spi.setLocalPort(GridTestUtils.getNextCommPort(getClass())); spi.setIdleConnectionTimeout(IDLE_CONN_TIMEOUT); - spi.setTcpNoDelay(tcpNoDelay); + spi.setTcpNoDelay(tcpNoDelay()); return spi; } + /** + * @return Value of property '{@link TcpCommunicationSpi#isTcpNoDelay()}'. + */ + protected boolean tcpNoDelay() { + return true; + } + /** {@inheritDoc} */ @Override protected int getSpiCount() { return SPI_COUNT; @@ -68,21 +72,12 @@ abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunica for (CommunicationSpi spi : spis.values()) { ConcurrentMap<UUID, GridTcpCommunicationClient> clients = U.field(spi, "clients"); - assert clients.size() == 2; + assertEquals(2, clients.size()); clients.put(UUID.randomUUID(), F.first(clients.values())); } } - /** - * @throws Exception If failed. - */ - public void testTcpNoDelay() throws Exception { - tcpNoDelay = true; - - super.testSendToManyNodes(); - } - /** {@inheritDoc} */ @Override protected void afterTest() throws Exception { super.afterTest(); @@ -91,7 +86,8 @@ abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunica ConcurrentMap<UUID, GridTcpCommunicationClient> clients = U.field(spi, "clients"); for (int i = 0; i < 10 && !clients.isEmpty(); i++) { - U.warn(log, "Check failed for SPI: " + spi); + info("Check failed for SPI [grid=" + GridTestUtils.getFieldValue(spi, "gridName") + + ", spi=" + spi + ']'); U.sleep(1000); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dae4b942/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java new file mode 100644 index 0000000..ffe5d57 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java @@ -0,0 +1,398 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.apache.ignite.spi.communication.tcp; + +import org.apache.ignite.cluster.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.spi.communication.*; +import org.eclipse.jetty.util.*; +import org.gridgain.grid.*; +import org.gridgain.grid.util.direct.*; +import org.gridgain.grid.util.lang.*; +import org.gridgain.grid.util.nio.*; +import org.gridgain.grid.util.typedef.internal.*; +import org.gridgain.testframework.*; +import org.gridgain.testframework.junits.*; +import org.gridgain.testframework.junits.spi.*; + +import java.net.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +/** + * + */ +@GridSpiTest(spi = TcpCommunicationSpi.class, group = "Communication SPI") +public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends CommunicationSpi> + extends GridSpiAbstractTest<T> { + /** */ + private static final int SPI_CNT = 2; + + /** */ + private static final int ITERS = 50; + + /** */ + private static final Collection<GridTestResources> spiRsrcs = new ArrayList<>(); + + /** */ + protected static final List<CommunicationSpi<GridTcpCommunicationMessageAdapter>> spis = new ArrayList<>(); + + /** */ + protected static final List<ClusterNode> nodes = new ArrayList<>(); + + /** */ + private static int port = 60_000; + + /** + * + */ + static { + GridTcpCommunicationMessageFactory.registerCustom(new GridTcpCommunicationMessageProducer() { + @Override + public GridTcpCommunicationMessageAdapter create(byte type) { + return new GridTestMessage(); + } + }, GridTestMessage.DIRECT_TYPE); + } + + /** + * Disable SPI auto-start. + */ + public GridTcpCommunicationSpiConcurrentConnectSelfTest() { + super(false); + } + + /** + * + */ + private static class MessageListener implements CommunicationListener<GridTcpCommunicationMessageAdapter> { + /** */ + private final CountDownLatch latch; + + /** */ + private final AtomicInteger cntr = new AtomicInteger(); + + /** */ + private final ConcurrentHashSet<Long> msgIds = new ConcurrentHashSet<>(); + + /** + * @param latch Latch. + */ + MessageListener(CountDownLatch latch) { + this.latch = latch; + } + + /** {@inheritDoc} */ + @Override public void onMessage(UUID nodeId, GridTcpCommunicationMessageAdapter msg, IgniteRunnable msgC) { + msgC.run(); + + assertTrue(msg instanceof GridTestMessage); + + cntr.incrementAndGet(); + + GridTestMessage msg0 = (GridTestMessage)msg; + + assertEquals(nodeId, msg0.getSourceNodeId()); + + assertTrue(msgIds.add(msg0.getMsgId())); + + latch.countDown(); + } + + /** {@inheritDoc} */ + @Override public void onDisconnected(UUID nodeId) { + // No-op. + } + } + + /** + * @throws Exception If failed. + */ + public void testTwoThreads() throws Exception { + concurrentConnect(2, 10, ITERS, false, false); + } + + /** + * @throws Exception If failed. + */ + public void testMultithreaded() throws Exception { + int threads = Runtime.getRuntime().availableProcessors() * 5; + + concurrentConnect(threads, 10, ITERS, false, false); + } + + /** + * @throws Exception If failed. + */ + public void testWithLoad() throws Exception { + int threads = Runtime.getRuntime().availableProcessors() * 5; + + concurrentConnect(threads, 10, ITERS / 2, false, true); + } + + /** + * @throws Exception If failed. + */ + public void testRandomSleep() throws Exception { + concurrentConnect(4, 1, ITERS, true, false); + } + + /** + * @param threads Number of threads. + * @param msgPerThread Messages per thread. + * @param iters Number of iterations. + * @param sleep If {@code true} sleeps random time before starts send messages. + * @param load Run load threads flag. + * @throws Exception If failed. + */ + private void concurrentConnect(final int threads, + final int msgPerThread, + final int iters, + final boolean sleep, + boolean load) throws Exception { + log.info("Concurrent connect [threads=" + threads + + ", msgPerThread=" + msgPerThread + + ", iters=" + iters + + ", load=" + load + + ", sleep=" + sleep + ']'); + + final AtomicBoolean stop = new AtomicBoolean(); + + IgniteFuture<?> loadFut = null; + + if (load) { + loadFut = GridTestUtils.runMultiThreadedAsync(new Callable<Long>() { + @Override + public Long call() throws Exception { + long dummyRes = 0; + + List<String> list = new ArrayList<>(); + + while (!stop.get()) { + for (int i = 0; i < 100; i++) { + String str = new String(new byte[i]); + + list.add(str); + + dummyRes += str.hashCode(); + } + + if (list.size() > 1000_000) { + list = new ArrayList<>(); + + System.gc(); + } + } + + return dummyRes; + } + }, 2, "test-load"); + } + + try { + for (int i = 0; i < iters; i++) { + log.info("Iteration: " + i); + + final AtomicInteger msgId = new AtomicInteger(); + + final int expMsgs = threads * msgPerThread; + + CountDownLatch latch = new CountDownLatch(expMsgs); + + MessageListener lsnr = new MessageListener(latch); + + createSpis(lsnr); + + final AtomicInteger idx = new AtomicInteger(); + + try { + GridTestUtils.runMultiThreaded(new Callable<Void>() { + @Override public Void call() throws Exception { + int idx0 = idx.getAndIncrement(); + + Thread.currentThread().setName("Test thread [idx=" + idx0 + ", grid=" + (idx0 % 2) + ']'); + + CommunicationSpi<GridTcpCommunicationMessageAdapter> spi = spis.get(idx0 % 2); + + ClusterNode srcNode = nodes.get(idx0 % 2); + + ClusterNode dstNode = nodes.get((idx0 + 1) % 2); + + if (sleep) { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + long millis = rnd.nextLong(10); + + if (millis > 0) + Thread.sleep(millis); + } + + for (int i = 0; i < msgPerThread; i++) + spi.sendMessage(dstNode, new GridTestMessage(srcNode.id(), msgId.incrementAndGet(), 0)); + + return null; + } + }, threads, "test"); + + assertTrue(latch.await(10, TimeUnit.SECONDS)); + + for (CommunicationSpi spi : spis) { + ConcurrentMap<UUID, GridTcpCommunicationClient> clients = U.field(spi, "clients"); + + assertEquals(1, clients.size()); + + final GridNioServer srv = U.field(spi, "nioSrvr"); + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + Collection sessions = U.field(srv, "sessions"); + + return sessions.size() == 1; + } + }, 5000); + + Collection sessions = U.field(srv, "sessions"); + + assertEquals(1, sessions.size()); + } + + assertEquals(expMsgs, lsnr.cntr.get()); + } + finally { + stopSpis(); + } + } + } + finally { + stop.set(true); + + if (loadFut != null) + loadFut.get(); + } + } + + /** + * @return SPI. + */ + private CommunicationSpi createSpi() { + TcpCommunicationSpi spi = new TcpCommunicationSpi(); + + spi.setLocalAddress("127.0.0.1"); + spi.setSharedMemoryPort(-1); + spi.setLocalPort(port++); + spi.setIdleConnectionTimeout(60_000); + spi.setConnectTimeout(10_000); + + return spi; + } + + /** + * @param lsnr Message listener. + * @throws Exception If failed. + */ + private void startSpis(MessageListener lsnr) throws Exception { + spis.clear(); + nodes.clear(); + spiRsrcs.clear(); + + Map<ClusterNode, GridSpiTestContext> ctxs = new HashMap<>(); + + for (int i = 0; i < SPI_CNT; i++) { + CommunicationSpi<GridTcpCommunicationMessageAdapter> spi = createSpi(); + + GridTestUtils.setFieldValue(spi, "gridName", "grid-" + i); + + GridTestResources rsrcs = new GridTestResources(); + + GridTestNode node = new GridTestNode(rsrcs.getNodeId()); + + node.order(i + 1); + + GridSpiTestContext ctx = initSpiContext(); + + ctx.setLocalNode(node); + + info(">>> Initialized context: nodeId=" + ctx.localNode().id()); + + spiRsrcs.add(rsrcs); + + rsrcs.inject(spi); + + spi.setListener(lsnr); + + node.setAttributes(spi.getNodeAttributes()); + + nodes.add(node); + + spi.spiStart(getTestGridName() + (i + 1)); + + spis.add(spi); + + spi.onContextInitialized(ctx); + + ctxs.put(node, ctx); + } + + // For each context set remote nodes. + for (Map.Entry<ClusterNode, GridSpiTestContext> e : ctxs.entrySet()) { + for (ClusterNode n : nodes) { + if (!n.equals(e.getKey())) + e.getValue().remoteNodes().add(n); + } + } + } + + /** + * @param lsnr Message listener. + * @throws Exception If failed. + */ + private void createSpis(MessageListener lsnr) throws Exception { + for (int i = 0; i < 3; i++) { + try { + startSpis(lsnr); + + break; + } + catch (GridException e) { + if (e.hasCause(BindException.class)) { + if (i < 2) { + info("Failed to start SPIs because of BindException, will retry after delay."); + + stopSpis(); + + U.sleep(10_000); + } + else + throw e; + } + else + throw e; + } + } + } + + /** + * @throws Exception If failed. + */ + private void stopSpis() throws Exception { + for (CommunicationSpi<GridTcpCommunicationMessageAdapter> spi : spis) { + spi.onContextDestroyed(); + + spi.setListener(null); + + spi.spiStop(); + } + + for (GridTestResources rsrcs : spiRsrcs) + rsrcs.stopThreads(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dae4b942/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java index 833c20d..ba34ddd 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java @@ -37,5 +37,9 @@ public class GridTcpCommunicationSpiConfigSelfTest extends GridSpiAbstractConfig checkNegativeSpiProperty(new TcpCommunicationSpi(), "bufferSizeRatio", 0); checkNegativeSpiProperty(new TcpCommunicationSpi(), "connectTimeout", -1); checkNegativeSpiProperty(new TcpCommunicationSpi(), "maxConnectTimeout", -1); + checkNegativeSpiProperty(new TcpCommunicationSpi(), "socketWriteTimeout", -1); + checkNegativeSpiProperty(new TcpCommunicationSpi(), "ackSendThreshold", 0); + checkNegativeSpiProperty(new TcpCommunicationSpi(), "ackSendThreshold", -1); + checkNegativeSpiProperty(new TcpCommunicationSpi(), "unacknowledgedMessagesBufferSize", -1); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dae4b942/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java index c49f104..0d650fb 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java @@ -9,27 +9,27 @@ package org.apache.ignite.spi.communication.tcp; -import mx4j.tools.adaptor.http.*; import org.apache.ignite.cluster.*; import org.apache.ignite.lang.*; import org.gridgain.grid.*; import org.apache.ignite.spi.communication.*; import org.gridgain.grid.util.direct.*; +import org.gridgain.grid.util.lang.*; import org.gridgain.grid.util.nio.*; import org.gridgain.grid.util.typedef.*; import org.gridgain.grid.util.typedef.internal.*; import org.gridgain.testframework.*; -import org.gridgain.testframework.config.*; import org.gridgain.testframework.junits.*; import org.gridgain.testframework.junits.spi.*; import org.jdk8.backport.*; -import javax.management.*; import java.util.*; import java.util.Map.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; +import static org.gridgain.grid.kernal.GridNodeAttributes.*; + /** * Class for multithreaded {@link TcpCommunicationSpi} test. */ @@ -60,9 +60,6 @@ public abstract class GridTcpCommunicationSpiMultithreadedSelfTest extends GridS /** Initialized nodes */ private static final List<ClusterNode> nodes = new ArrayList<>(); - /** */ - private static final ObjectName mBeanName; - /** Flag indicating if listener should reject messages. */ private static boolean reject; @@ -72,13 +69,6 @@ public abstract class GridTcpCommunicationSpiMultithreadedSelfTest extends GridS return new GridTestMessage(); } }, GridTestMessage.DIRECT_TYPE); - - try { - mBeanName = new ObjectName("mbeanAdaptor:protocol=HTTP"); - } - catch (MalformedObjectNameException e) { - throw new GridRuntimeException(e); - } } /** @@ -168,8 +158,7 @@ public abstract class GridTcpCommunicationSpiMultithreadedSelfTest extends GridS assertEquals("Invalid listener count", getSpiCount(), lsnrs.size()); - final ConcurrentMap<UUID, ConcurrentLinkedDeque8<GridTestMessage>> msgs = - new ConcurrentHashMap<>(); + final ConcurrentMap<UUID, ConcurrentLinkedDeque8<GridTestMessage>> msgs = new ConcurrentHashMap<>(); final int iterationCnt = 5000; @@ -331,6 +320,28 @@ public abstract class GridTcpCommunicationSpiMultithreadedSelfTest extends GridS run.set(false); fut2.get(); + + // Wait when all messages are acknowledged to do not break next tests' logic. + for (CommunicationSpi<GridTcpCommunicationMessageAdapter> spi : spis.values()) { + GridNioServer srv = U.field(spi, "nioSrvr"); + + Collection<? extends GridNioSession> sessions = GridTestUtils.getFieldValue(srv, "sessions"); + + for (GridNioSession ses : sessions) { + final GridNioRecoveryDescriptor snd = ses.recoveryDescriptor(); + + if (snd != null) { + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return snd.messagesFutures().isEmpty(); + } + }, 10_000); + + assertEquals("Unexpected messages: " + snd.messagesFutures(), 0, + snd.messagesFutures().size()); + } + } + } } /** @@ -415,6 +426,8 @@ public abstract class GridTcpCommunicationSpiMultithreadedSelfTest extends GridS /** {@inheritDoc} */ @Override protected void beforeTestsStarted() throws Exception { + U.setWorkDirectory(null, U.getGridGainHome()); + spis.clear(); nodes.clear(); spiRsrcs.clear(); @@ -425,10 +438,14 @@ public abstract class GridTcpCommunicationSpiMultithreadedSelfTest extends GridS for (int i = 0; i < getSpiCount(); i++) { CommunicationSpi<GridTcpCommunicationMessageAdapter> spi = newCommunicationSpi(); - GridTestResources rsrcs = new GridTestResources(getMBeanServer(i)); + GridTestUtils.setFieldValue(spi, "gridName", "grid-" + i); + + GridTestResources rsrcs = new GridTestResources(); GridTestNode node = new GridTestNode(rsrcs.getNodeId()); + node.order(i); + GridSpiTestContext ctx = initSpiContext(); ctx.setLocalNode(node); @@ -448,6 +465,7 @@ public abstract class GridTcpCommunicationSpiMultithreadedSelfTest extends GridS info("Lsnrs: " + lsnrs); node.setAttributes(spi.getNodeAttributes()); + node.setAttribute(ATTR_MACS, F.concat(U.allLocalMACs(), ", ")); nodes.add(node); @@ -491,40 +509,19 @@ public abstract class GridTcpCommunicationSpiMultithreadedSelfTest extends GridS } } - /** - * @param idx Node index. - * @return Configured MBean server. - * @throws Exception If failed. - */ - private MBeanServer getMBeanServer(int idx) throws Exception { - HttpAdaptor mbeanAdaptor = new HttpAdaptor(); - - MBeanServer mbeanSrv = MBeanServerFactory.createMBeanServer(); - - mbeanAdaptor.setPort( - Integer.valueOf(GridTestProperties.getProperty("comm.mbeanserver.selftest.baseport")) + idx); - - mbeanSrv.registerMBean(mbeanAdaptor, mBeanName); - - mbeanAdaptor.start(); - - return mbeanSrv; - } - /** {@inheritDoc} */ @Override protected void afterTestsStopped() throws Exception { for (CommunicationSpi<GridTcpCommunicationMessageAdapter> spi : spis.values()) { + spi.onContextDestroyed(); + spi.setListener(null); spi.spiStop(); } - for (GridTestResources rsrcs : spiRsrcs) { + for (GridTestResources rsrcs : spiRsrcs) rsrcs.stopThreads(); - rsrcs.getMBeanServer().unregisterMBean(mBeanName); - } - lsnrs.clear(); spiRsrcs.clear(); spis.clear(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dae4b942/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedShmemTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedShmemTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedShmemTest.java index 98e614e..70334ee 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedShmemTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedShmemTest.java @@ -15,6 +15,6 @@ package org.apache.ignite.spi.communication.tcp; public class GridTcpCommunicationSpiMultithreadedShmemTest extends GridTcpCommunicationSpiMultithreadedSelfTest { /** */ public GridTcpCommunicationSpiMultithreadedShmemTest() { - super(true); + super(false); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dae4b942/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java new file mode 100644 index 0000000..b20e50e --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java @@ -0,0 +1,426 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.apache.ignite.spi.communication.tcp; + +import org.apache.ignite.cluster.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.spi.communication.*; +import org.eclipse.jetty.util.*; +import org.gridgain.grid.*; +import org.gridgain.grid.util.direct.*; +import org.gridgain.grid.util.lang.*; +import org.gridgain.grid.util.nio.*; +import org.gridgain.grid.util.typedef.internal.*; +import org.gridgain.testframework.*; +import org.gridgain.testframework.junits.*; +import org.gridgain.testframework.junits.spi.*; + +import java.net.*; +import java.util.*; +import java.util.concurrent.atomic.*; + +/** + * + */ +@GridSpiTest(spi = TcpCommunicationSpi.class, group = "Communication SPI") +public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationSpi> extends GridSpiAbstractTest<T> { + /** */ + private static final Collection<GridTestResources> spiRsrcs = new ArrayList<>(); + + /** */ + protected static final List<TcpCommunicationSpi> spis = new ArrayList<>(); + + /** */ + protected static final List<ClusterNode> nodes = new ArrayList<>(); + + /** */ + private static final int SPI_CNT = 2; + + /** + * + */ + static { + GridTcpCommunicationMessageFactory.registerCustom(new GridTcpCommunicationMessageProducer() { + @Override + public GridTcpCommunicationMessageAdapter create(byte type) { + return new GridTestMessage(); + } + }, GridTestMessage.DIRECT_TYPE); + } + + /** + * Disable SPI auto-start. + */ + public GridTcpCommunicationSpiRecoveryAckSelfTest() { + super(false); + } + + /** */ + @SuppressWarnings({"deprecation"}) + private class TestListener implements CommunicationListener<GridTcpCommunicationMessageAdapter> { + /** */ + private ConcurrentHashSet<Long> msgIds = new ConcurrentHashSet<>(); + + /** */ + private AtomicInteger rcvCnt = new AtomicInteger(); + + /** {@inheritDoc} */ + @Override public void onMessage(UUID nodeId, GridTcpCommunicationMessageAdapter msg, IgniteRunnable msgC) { + info("Test listener received message: " + msg); + + assertTrue("Unexpected message: " + msg, msg instanceof GridTestMessage); + + GridTestMessage msg0 = (GridTestMessage)msg; + + assertTrue("Duplicated message received: " + msg0, msgIds.add(msg0.getMsgId())); + + rcvCnt.incrementAndGet(); + + msgC.run(); + } + + /** {@inheritDoc} */ + @Override public void onDisconnected(UUID nodeId) { + // No-op. + } + } + + /** + * @throws Exception If failed. + */ + public void testAckOnIdle() throws Exception { + checkAck(10, 2000, 9); + } + + /** + * @throws Exception If failed. + */ + public void testAckOnCount() throws Exception { + checkAck(10, 60_000, 10); + } + + /** + * @param ackCnt Recovery acknowledgement count. + * @param idleTimeout Idle connection timeout. + * @param msgPerIter Messages per iteration. + * @throws Exception If failed. + */ + private void checkAck(int ackCnt, int idleTimeout, int msgPerIter) throws Exception { + createSpis(ackCnt, idleTimeout, TcpCommunicationSpi.DFLT_MSG_QUEUE_LIMIT); + + try { + TcpCommunicationSpi spi0 = spis.get(0); + TcpCommunicationSpi spi1 = spis.get(1); + + ClusterNode node0 = nodes.get(0); + ClusterNode node1 = nodes.get(1); + + int msgId = 0; + + int expMsgs = 0; + + for (int i = 0; i < 5; i++) { + info("Iteration: " + i); + + for (int j = 0; j < msgPerIter; j++) { + spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0)); + + spi1.sendMessage(node0, new GridTestMessage(node1.id(), ++msgId, 0)); + } + + expMsgs += msgPerIter; + + for (TcpCommunicationSpi spi : spis) { + GridNioServer srv = U.field(spi, "nioSrvr"); + + Collection<? extends GridNioSession> sessions = GridTestUtils.getFieldValue(srv, "sessions"); + + assertFalse(sessions.isEmpty()); + + boolean found = false; + + for (GridNioSession ses : sessions) { + final GridNioRecoveryDescriptor recoveryDesc = ses.recoveryDescriptor(); + + if (recoveryDesc != null) { + found = true; + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return recoveryDesc.messagesFutures().isEmpty(); + } + }, 10_000); + + assertEquals("Unexpected messages: " + recoveryDesc.messagesFutures(), 0, + recoveryDesc.messagesFutures().size()); + + break; + } + } + + assertTrue(found); + } + + final int expMsgs0 = expMsgs; + + for (TcpCommunicationSpi spi : spis) { + final TestListener lsnr = (TestListener)spi.getListener(); + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override + public boolean apply() { + return lsnr.rcvCnt.get() >= expMsgs0; + } + }, 5000); + + assertEquals(expMsgs, lsnr.rcvCnt.get()); + } + } + } + finally { + stopSpis(); + } + } + + /** + * @throws Exception If failed. + */ + public void testQueueOverflow() throws Exception { + for (int i = 0; i < 3; i++) { + try { + startSpis(5, 60_000, 10); + + checkOverflow(); + + break; + } + catch (GridException e) { + if (e.hasCause(BindException.class)) { + if (i < 2) { + info("Got exception caused by BindException, will retry after delay: " + e); + + stopSpis(); + + U.sleep(10_000); + } + else + throw e; + } + else + throw e; + } + finally { + stopSpis(); + } + } + } + + /** + * @throws Exception If failed. + */ + private void checkOverflow() throws Exception { + TcpCommunicationSpi spi0 = spis.get(0); + TcpCommunicationSpi spi1 = spis.get(1); + + ClusterNode node0 = nodes.get(0); + ClusterNode node1 = nodes.get(1); + + final GridNioServer srv1 = U.field(spi1, "nioSrvr"); + + int msgId = 0; + + // Send message to establish connection. + spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0)); + + // Prevent node1 from send + GridTestUtils.setFieldValue(srv1, "skipWrite", true); + + final GridNioSession ses0 = communicationSession(spi0); + + for (int i = 0; i < 150; i++) + spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0)); + + // Wait when session is closed because of queue overflow. + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return ses0.closeTime() != 0; + } + }, 5000); + + assertTrue("Failed to wait for session close", ses0.closeTime() != 0); + + GridTestUtils.setFieldValue(srv1, "skipWrite", false); + + for (int i = 0; i < 100; i++) + spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0)); + + final int expMsgs = 251; + + final TestListener lsnr = (TestListener)spi1.getListener(); + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override + public boolean apply() { + return lsnr.rcvCnt.get() >= expMsgs; + } + }, 5000); + + assertEquals(expMsgs, lsnr.rcvCnt.get()); + } + + /** + * @param spi SPI. + * @return Session. + * @throws Exception If failed. + */ + @SuppressWarnings("unchecked") + private GridNioSession communicationSession(TcpCommunicationSpi spi) throws Exception { + final GridNioServer srv = U.field(spi, "nioSrvr"); + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override + public boolean apply() { + Collection<? extends GridNioSession> sessions = GridTestUtils.getFieldValue(srv, "sessions"); + + return !sessions.isEmpty(); + } + }, 5000); + + Collection<? extends GridNioSession> sessions = GridTestUtils.getFieldValue(srv, "sessions"); + + assertEquals(1, sessions.size()); + + return sessions.iterator().next(); + } + + /** + * @param ackCnt Recovery acknowledgement count. + * @param idleTimeout Idle connection timeout. + * @param queueLimit Message queue limit. + * @return SPI instance. + */ + protected TcpCommunicationSpi getSpi(int ackCnt, int idleTimeout, int queueLimit) { + TcpCommunicationSpi spi = new TcpCommunicationSpi(); + + spi.setSharedMemoryPort(-1); + spi.setLocalPort(GridTestUtils.getNextCommPort(getClass())); + spi.setIdleConnectionTimeout(idleTimeout); + spi.setTcpNoDelay(true); + spi.setAckSendThreshold(ackCnt); + spi.setMessageQueueLimit(queueLimit); + + return spi; + } + + /** + * @param ackCnt Recovery acknowledgement count. + * @param idleTimeout Idle connection timeout. + * @param queueLimit Message queue limit. + * @throws Exception If failed. + */ + private void startSpis(int ackCnt, int idleTimeout, int queueLimit) throws Exception { + spis.clear(); + nodes.clear(); + spiRsrcs.clear(); + + Map<ClusterNode, GridSpiTestContext> ctxs = new HashMap<>(); + + for (int i = 0; i < SPI_CNT; i++) { + TcpCommunicationSpi spi = getSpi(ackCnt, idleTimeout, queueLimit); + + GridTestUtils.setFieldValue(spi, "gridName", "grid-" + i); + + GridTestResources rsrcs = new GridTestResources(); + + GridTestNode node = new GridTestNode(rsrcs.getNodeId()); + + GridSpiTestContext ctx = initSpiContext(); + + ctx.setLocalNode(node); + + spiRsrcs.add(rsrcs); + + rsrcs.inject(spi); + + spi.setListener(new TestListener()); + + node.setAttributes(spi.getNodeAttributes()); + + nodes.add(node); + + spi.spiStart(getTestGridName() + (i + 1)); + + spis.add(spi); + + spi.onContextInitialized(ctx); + + ctxs.put(node, ctx); + } + + // For each context set remote nodes. + for (Map.Entry<ClusterNode, GridSpiTestContext> e : ctxs.entrySet()) { + for (ClusterNode n : nodes) { + if (!n.equals(e.getKey())) + e.getValue().remoteNodes().add(n); + } + } + } + + /** + * @param ackCnt Recovery acknowledgement count. + * @param idleTimeout Idle connection timeout. + * @param queueLimit Message queue limit. + * @throws Exception If failed. + */ + private void createSpis(int ackCnt, int idleTimeout, int queueLimit) throws Exception { + for (int i = 0; i < 3; i++) { + try { + startSpis(ackCnt, idleTimeout, queueLimit); + + break; + } + catch (GridException e) { + if (e.hasCause(BindException.class)) { + if (i < 2) { + info("Failed to start SPIs because of BindException, will retry after delay."); + + stopSpis(); + + U.sleep(10_000); + } + else + throw e; + } + else + throw e; + } + } + } + + /** + * @throws Exception If failed. + */ + private void stopSpis() throws Exception { + for (CommunicationSpi<GridTcpCommunicationMessageAdapter> spi : spis) { + spi.onContextDestroyed(); + + spi.setListener(null); + + spi.spiStop(); + } + + for (GridTestResources rsrcs : spiRsrcs) + rsrcs.stopThreads(); + + spis.clear(); + nodes.clear(); + spiRsrcs.clear(); + } +}