Repository: incubator-ignite Updated Branches: refs/heads/ignite-1169 [created] 325458125
IGNITE-1169 Implement on TcpCommunicationSpi sendWithAck methods. Added tests. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/32545812 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/32545812 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/32545812 Branch: refs/heads/ignite-1169 Commit: 325458125fe5cb34c1cf79b3e7ece161b6cff05c Parents: f82fb5c Author: nikolay_tikhonov <ntikho...@gridgain.com> Authored: Wed Jul 29 19:50:10 2015 +0300 Committer: nikolay_tikhonov <ntikho...@gridgain.com> Committed: Wed Jul 29 19:09:00 2015 +0300 ---------------------------------------------------------------------- .../util/nio/GridCommunicationClient.java | 11 + .../util/nio/GridNioRecoveryDescriptor.java | 54 ++- .../util/nio/GridShmemCommunicationClient.java | 8 + .../util/nio/GridTcpNioCommunicationClient.java | 32 ++ .../communication/tcp/TcpCommunicationSpi.java | 70 ++- ...mmunicationSpiRecoveryAckFutureSelfTest.java | 447 +++++++++++++++++++ .../IgniteSpiCommunicationSelfTestSuite.java | 1 + 7 files changed, 619 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/32545812/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java index 693a5a4..0403272 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.util.nio; import org.apache.ignite.*; +import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.plugin.extensions.communication.*; import org.jetbrains.annotations.*; @@ -100,6 +101,16 @@ public interface GridCommunicationClient { public boolean sendMessage(@Nullable UUID nodeId, Message msg) throws IgniteCheckedException; /** + * @param nodeId Node ID (provided only if versions of local and remote nodes are different). + * @param msg Message to send. + * @param fut Future which done when will be received ack on the message. + * @throws IgniteCheckedException If failed. + * @return {@code True} if should try to resend message. + */ + public boolean sendMessageWithAck(@Nullable UUID nodeId, Message msg, GridFutureAdapter<Boolean> fut) + throws IgniteCheckedException; + + /** * @return {@code True} if send is asynchronous. */ public boolean async(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/32545812/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java index 733ae81..66ae60f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.util.nio; import org.apache.ignite.*; import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; import org.jetbrains.annotations.*; @@ -36,6 +37,9 @@ public class GridNioRecoveryDescriptor { /** Unacknowledged message futures. */ private final ArrayDeque<GridNioFuture<?>> msgFuts; + /** Unacknowledged message futures. */ + private final Map<GridNioFuture<?>, GridFutureAdapter<Boolean>> ackFuts; + /** Number of messages to resend. */ private int resendCnt; @@ -79,6 +83,7 @@ public class GridNioRecoveryDescriptor { assert queueLimit > 0; msgFuts = new ArrayDeque<>(queueLimit); + ackFuts = new HashMap<>(queueLimit); this.queueLimit = queueLimit; this.node = node; @@ -166,6 +171,16 @@ public class GridNioRecoveryDescriptor { } /** + * @param nioFut fut NIO future. + * @param fut ack future. + */ + public void add(GridNioFuture<?> nioFut, GridFutureAdapter<Boolean> fut) { + assert fut != null; + + ackFuts.put(nioFut, fut); + } + + /** * @param rcvCnt Number of messages received by remote node. */ public void ackReceived(long rcvCnt) { @@ -173,6 +188,8 @@ public class GridNioRecoveryDescriptor { log.debug("Handle acknowledgment [acked=" + acked + ", rcvCnt=" + rcvCnt + ", msgFuts=" + msgFuts.size() + ']'); + GridFutureAdapter<Boolean> ackFut; + while (acked < rcvCnt) { GridNioFuture<?> fut = msgFuts.pollFirst(); @@ -183,6 +200,12 @@ public class GridNioRecoveryDescriptor { assert fut.isDone() : fut; acked++; + + if (!ackFuts.isEmpty() && (ackFut = ackFuts.get(fut)) != null) { + ackFut.onDone(true); + + ackFuts.remove(fut); + } } } @@ -191,6 +214,7 @@ public class GridNioRecoveryDescriptor { */ public void onNodeLeft() { GridNioFuture<?>[] futs = null; + GridFutureAdapter<?>[] akFuts = null; synchronized (this) { nodeLeft = true; @@ -200,10 +224,16 @@ public class GridNioRecoveryDescriptor { msgFuts.clear(); } + + if (!reserved && !ackFuts.isEmpty()) { + akFuts = ackFuts.values().toArray(new GridFutureAdapter<?>[ackFuts.size()]); + + ackFuts.clear(); + } } if (futs != null) - completeOnNodeLeft(futs); + completeOnNodeLeft(futs, akFuts); } /** @@ -214,6 +244,13 @@ public class GridNioRecoveryDescriptor { } /** + * @return Futures for unacknowledged messages. + */ + public Collection<GridFutureAdapter<Boolean>> ackMessageFutures() { + return ackFuts.values(); + } + + /** * @param node Node. * @return {@code True} if node is not null and has the same order as initial remtoe node. */ @@ -278,6 +315,7 @@ public class GridNioRecoveryDescriptor { */ public void release() { GridNioFuture<?>[] futs = null; + GridFutureAdapter<?>[] akFuts = null; synchronized (this) { connected = false; @@ -302,10 +340,16 @@ public class GridNioRecoveryDescriptor { msgFuts.clear(); } + + if (nodeLeft && !ackFuts.isEmpty()) { + akFuts = ackFuts.values().toArray(new GridFutureAdapter<?>[ackFuts.size()]); + + ackFuts.clear(); + } } if (futs != null) - completeOnNodeLeft(futs); + completeOnNodeLeft(futs, akFuts); } /** @@ -356,10 +400,14 @@ public class GridNioRecoveryDescriptor { /** * @param futs Futures to complete. + * @param ackFuts Ack futures to complete. */ - private void completeOnNodeLeft(GridNioFuture<?>[] futs) { + private void completeOnNodeLeft(GridNioFuture<?>[] futs, GridFutureAdapter<?>[] ackFuts) { for (GridNioFuture<?> msg : futs) ((GridNioFutureImpl)msg).onDone(new IOException("Failed to send message, node has left: " + node.id())); + + for (GridFutureAdapter<?> fut : ackFuts) + fut.onDone(new IOException("Failed to send message, node has left: " + node.id())); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/32545812/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java index e05c37a..134d271 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java @@ -18,6 +18,8 @@ package org.apache.ignite.internal.util.nio; import org.apache.ignite.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.ipc.shmem.*; import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.typedef.internal.*; @@ -135,6 +137,12 @@ public class GridShmemCommunicationClient extends GridAbstractCommunicationClien } /** {@inheritDoc} */ + @Override public boolean sendMessageWithAck(@Nullable UUID nodeId, Message msg, + GridFutureAdapter<Boolean> fut) throws IgniteCheckedException { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ @Override public void sendMessage(ByteBuffer data) throws IgniteCheckedException { throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/32545812/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java index abad875..834371f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.util.nio; import org.apache.ignite.*; +import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.plugin.extensions.communication.*; @@ -123,6 +124,37 @@ public class GridTcpNioCommunicationClient extends GridAbstractCommunicationClie } /** {@inheritDoc} */ + @Override public boolean sendMessageWithAck(@Nullable UUID nodeId, Message msg, + GridFutureAdapter<Boolean> fut) throws IgniteCheckedException { + // Node ID is never provided in asynchronous send mode. + assert nodeId == null; + + GridNioRecoveryDescriptor recovery = ses.recoveryDescriptor(); + + GridNioFuture<?> nioFut = ses.send(msg); + + if (nioFut.isDone()) { + try { + nioFut.get(); + } + catch (IgniteCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to send message [client=" + this + ", err=" + e + ']'); + + if (e.getCause() instanceof IOException) + return true; + else + throw new IgniteCheckedException("Failed to send message [client=" + this + ']', e); + } + } + + if (recovery != null) + recovery.add(nioFut, fut); + + return false; + } + + /** {@inheritDoc} */ @Override public boolean async() { return true; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/32545812/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index a0acb5c..53c6ddf 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -1350,7 +1350,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (slowClientQueueLimit > 0 && msgQueueLimit > 0 && slowClientQueueLimit >= msgQueueLimit) { U.quietAndWarn(log, "Slow client queue limit is set to a value greater than message queue limit " + "(slow client queue limit will have no effect) [msgQueueLimit=" + msgQueueLimit + - ", slowClientQueueLimit=" + slowClientQueueLimit + ']'); + ", slowClientQueueLimit=" + slowClientQueueLimit + ']'); } registerMBean(gridName, this, TcpCommunicationSpiMBean.class); @@ -1749,6 +1749,73 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } /** + * + * @param node + * @param msg + * @return + * @throws IgniteSpiException + */ + public IgniteInternalFuture<Boolean> sendMessageWithAck(ClusterNode node, Message msg) throws IgniteSpiException { + assert node != null; + assert msg != null; + + if (log.isTraceEnabled()) + log.trace("Sending message with ack to node [node=" + node + ", msg=" + msg + ']'); + + IgniteInternalFuture<Boolean> fut = null; + + if (node.id().equals(getLocalNode().id())) { + notifyListener(node.id(), msg, NOOP); + + fut = new GridFinishedFuture<>(true); + } + else { + GridCommunicationClient client = null; + + try { + boolean retry; + + do { + client = reserveClient(node); + + UUID nodeId = null; + + if (!client.async() && !getSpiContext().localNode().version().equals(node.version())) + nodeId = node.id(); + + fut = new GridFutureAdapter<>(); + + retry = client.sendMessageWithAck(nodeId, msg, (GridFutureAdapter)fut); + + client.release(); + + client = null; + + if (!retry) + sentMsgsCnt.increment(); + else { + ClusterNode node0 = getSpiContext().node(node.id()); + + if (node0 == null) + throw new IgniteCheckedException("Failed to send message to remote node " + + "(node has left the grid): " + node.id()); + } + } + while (retry); + } + catch (IgniteCheckedException e) { + throw new IgniteSpiException("Failed to send message to remote node: " + node, e); + } + finally { + if (client != null && clients.remove(node.id(), client)) + client.forceClose(); + } + } + + return fut; + } + + /** * Returns existing or just created client to node. * * @param node Node to which client should be open. @@ -2086,6 +2153,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter meta.put(GridNioSessionMetaKey.SSL_ENGINE.ordinal(), sslEngine); } + if (recoveryDesc != null) { recoveryDesc.onHandshake(rcvCnt); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/32545812/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckFutureSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckFutureSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckFutureSelfTest.java new file mode 100644 index 0000000..2c2ef2e --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckFutureSelfTest.java @@ -0,0 +1,447 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.communication.tcp; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.managers.communication.*; +import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.internal.util.nio.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.plugin.extensions.communication.*; +import org.apache.ignite.spi.*; +import org.apache.ignite.spi.communication.*; +import org.apache.ignite.testframework.*; +import org.apache.ignite.testframework.junits.*; +import org.apache.ignite.testframework.junits.spi.*; + +import org.eclipse.jetty.util.*; + +import java.net.*; +import java.util.*; +import java.util.concurrent.atomic.*; + +/** + * + */ +@GridSpiTest(spi = TcpCommunicationSpi.class, group = "Communication SPI") +public class GridTcpCommunicationSpiRecoveryAckFutureSelfTest<T extends CommunicationSpi> extends GridSpiAbstractTest<T> { + /** */ + private static final Collection<IgniteTestResources> spiRsrcs = new ArrayList<>(); + + /** */ + protected static final List<TcpCommunicationSpi> spis = new ArrayList<>(); + + /** */ + protected static final List<ClusterNode> nodes = new ArrayList<>(); + + /** */ + private static final int SPI_CNT = 2; + + /** + * + */ + static { + GridIoMessageFactory.registerCustom(GridTestMessage.DIRECT_TYPE, new CO<Message>() { + @Override public Message apply() { + return new GridTestMessage(); + } + }); + } + + /** + * Disable SPI auto-start. + */ + public GridTcpCommunicationSpiRecoveryAckFutureSelfTest() { + super(false); + } + + /** */ + @SuppressWarnings({"deprecation"}) + private class TestListener implements CommunicationListener<Message> { + /** */ + private ConcurrentHashSet<Long> msgIds = new ConcurrentHashSet<>(); + + /** */ + private AtomicInteger rcvCnt = new AtomicInteger(); + + /** {@inheritDoc} */ + @Override public void onMessage(UUID nodeId, Message msg, IgniteRunnable msgC) { + info("Test listener received message: " + msg); + + assertTrue("Unexpected message: " + msg, msg instanceof GridTestMessage); + + GridTestMessage msg0 = (GridTestMessage)msg; + + assertTrue("Duplicated message received: " + msg0, msgIds.add(msg0.getMsgId())); + + rcvCnt.incrementAndGet(); + + msgC.run(); + } + + /** {@inheritDoc} */ + @Override public void onDisconnected(UUID nodeId) { + // No-op. + } + } + + /** + * @throws Exception If failed. + */ + public void testAckOnIdle() throws Exception { + checkAck(10, 2000, 9); + } + + /** + * @throws Exception If failed. + */ + public void testAckOnCount() throws Exception { + checkAck(10, 60_000, 10); + } + + /** + * @param ackCnt Recovery acknowledgement count. + * @param idleTimeout Idle connection timeout. + * @param msgPerIter Messages per iteration. + * @throws Exception If failed. + */ + private void checkAck(int ackCnt, int idleTimeout, int msgPerIter) throws Exception { + createSpis(ackCnt, idleTimeout, TcpCommunicationSpi.DFLT_MSG_QUEUE_LIMIT); + + try { + TcpCommunicationSpi spi0 = spis.get(0); + TcpCommunicationSpi spi1 = spis.get(1); + + ClusterNode node0 = nodes.get(0); + ClusterNode node1 = nodes.get(1); + + int msgId = 0; + + int expMsgs = 0; + + for (int i = 0; i < 5; i++) { + info("Iteration: " + i); + + List<IgniteInternalFuture<Boolean>> futs = new ArrayList<>(); + + for (int j = 0; j < msgPerIter; j++) { + futs.add(spi0.sendMessageWithAck(node1, new GridTestMessage(node0.id(), ++msgId, 0))); + + futs.add(spi1.sendMessageWithAck(node0, new GridTestMessage(node1.id(), ++msgId, 0))); + } + + expMsgs += msgPerIter; + + for (TcpCommunicationSpi spi : spis) { + GridNioServer srv = U.field(spi, "nioSrvr"); + + Collection<? extends GridNioSession> sessions = GridTestUtils.getFieldValue(srv, "sessions"); + + assertFalse(sessions.isEmpty()); + + boolean found = false; + + for (GridNioSession ses : sessions) { + final GridNioRecoveryDescriptor recoveryDesc = ses.recoveryDescriptor(); + + if (recoveryDesc != null) { + found = true; + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return recoveryDesc.messagesFutures().isEmpty(); + } + }, spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() + 7000 : + 10_000); + + assertEquals("Unexpected messages: " + recoveryDesc.messagesFutures(), 0, + recoveryDesc.messagesFutures().size()); + + assertEquals("Unexpected ack messages: " + recoveryDesc.ackMessageFutures(), 0, + recoveryDesc.ackMessageFutures().size()); + + break; + } + } + + assertTrue(found); + } + + final int expMsgs0 = expMsgs; + + for (TcpCommunicationSpi spi : spis) { + final TestListener lsnr = (TestListener)spi.getListener(); + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override + public boolean apply() { + return lsnr.rcvCnt.get() >= expMsgs0; + } + }, 5000); + + assertEquals(expMsgs, lsnr.rcvCnt.get()); + } + + for (IgniteInternalFuture<Boolean> f : futs) + assert f.get(); + } + } + finally { + stopSpis(); + } + } + + /** + * @throws Exception If failed. + */ + public void testQueueOverflow() throws Exception { + for (int i = 0; i < 3; i++) { + try { + startSpis(5, 60_000, 10); + + checkOverflow(); + + break; + } + catch (IgniteCheckedException e) { + if (e.hasCause(BindException.class)) { + if (i < 2) { + info("Got exception caused by BindException, will retry after delay: " + e); + + stopSpis(); + + U.sleep(10_000); + } + else + throw e; + } + else + throw e; + } + finally { + stopSpis(); + } + } + } + + /** + * @throws Exception If failed. + */ + private void checkOverflow() throws Exception { + TcpCommunicationSpi spi0 = spis.get(0); + TcpCommunicationSpi spi1 = spis.get(1); + + ClusterNode node0 = nodes.get(0); + ClusterNode node1 = nodes.get(1); + + final GridNioServer srv1 = U.field(spi1, "nioSrvr"); + + int msgId = 0; + + // Send message to establish connection. + spi0.sendMessageWithAck(node1, new GridTestMessage(node0.id(), ++msgId, 0)); + + // Prevent node1 from send + GridTestUtils.setFieldValue(srv1, "skipWrite", true); + + final GridNioSession ses0 = communicationSession(spi0); + + for (int i = 0; i < 150; i++) + spi0.sendMessageWithAck(node1, new GridTestMessage(node0.id(), ++msgId, 0)); + + // Wait when session is closed because of queue overflow. + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return ses0.closeTime() != 0; + } + }, 5000); + + assertTrue("Failed to wait for session close", ses0.closeTime() != 0); + + GridTestUtils.setFieldValue(srv1, "skipWrite", false); + + for (int i = 0; i < 100; i++) + spi0.sendMessageWithAck(node1, new GridTestMessage(node0.id(), ++msgId, 0)); + + final int expMsgs = 251; + + final TestListener lsnr = (TestListener)spi1.getListener(); + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override + public boolean apply() { + return lsnr.rcvCnt.get() >= expMsgs; + } + }, 5000); + + assertEquals(expMsgs, lsnr.rcvCnt.get()); + } + + /** + * @param spi SPI. + * @return Session. + * @throws Exception If failed. + */ + @SuppressWarnings("unchecked") + private GridNioSession communicationSession(TcpCommunicationSpi spi) throws Exception { + final GridNioServer srv = U.field(spi, "nioSrvr"); + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override + public boolean apply() { + Collection<? extends GridNioSession> sessions = GridTestUtils.getFieldValue(srv, "sessions"); + + return !sessions.isEmpty(); + } + }, 5000); + + Collection<? extends GridNioSession> sessions = GridTestUtils.getFieldValue(srv, "sessions"); + + assertEquals(1, sessions.size()); + + return sessions.iterator().next(); + } + + /** + * @param ackCnt Recovery acknowledgement count. + * @param idleTimeout Idle connection timeout. + * @param queueLimit Message queue limit. + * @return SPI instance. + */ + protected TcpCommunicationSpi getSpi(int ackCnt, int idleTimeout, int queueLimit) { + TcpCommunicationSpi spi = new TcpCommunicationSpi(); + + spi.setLocalPort(GridTestUtils.getNextCommPort(getClass())); + spi.setIdleConnectionTimeout(idleTimeout); + spi.setTcpNoDelay(true); + spi.setAckSendThreshold(ackCnt); + spi.setMessageQueueLimit(queueLimit); + spi.setSharedMemoryPort(-1); + + return spi; + } + + /** + * @param ackCnt Recovery acknowledgement count. + * @param idleTimeout Idle connection timeout. + * @param queueLimit Message queue limit. + * @throws Exception If failed. + */ + private void startSpis(int ackCnt, int idleTimeout, int queueLimit) throws Exception { + spis.clear(); + nodes.clear(); + spiRsrcs.clear(); + + Map<ClusterNode, GridSpiTestContext> ctxs = new HashMap<>(); + + for (int i = 0; i < SPI_CNT; i++) { + TcpCommunicationSpi spi = getSpi(ackCnt, idleTimeout, queueLimit); + + GridTestUtils.setFieldValue(spi, IgniteSpiAdapter.class, "gridName", "grid-" + i); + + IgniteTestResources rsrcs = new IgniteTestResources(); + + GridTestNode node = new GridTestNode(rsrcs.getNodeId()); + + GridSpiTestContext ctx = initSpiContext(); + + ctx.setLocalNode(node); + + spiRsrcs.add(rsrcs); + + rsrcs.inject(spi); + + spi.setListener(new TestListener()); + + node.setAttributes(spi.getNodeAttributes()); + + nodes.add(node); + + spi.spiStart(getTestGridName() + (i + 1)); + + spis.add(spi); + + spi.onContextInitialized(ctx); + + ctxs.put(node, ctx); + } + + // For each context set remote nodes. + for (Map.Entry<ClusterNode, GridSpiTestContext> e : ctxs.entrySet()) { + for (ClusterNode n : nodes) { + if (!n.equals(e.getKey())) + e.getValue().remoteNodes().add(n); + } + } + } + + /** + * @param ackCnt Recovery acknowledgement count. + * @param idleTimeout Idle connection timeout. + * @param queueLimit Message queue limit. + * @throws Exception If failed. + */ + private void createSpis(int ackCnt, int idleTimeout, int queueLimit) throws Exception { + for (int i = 0; i < 3; i++) { + try { + startSpis(ackCnt, idleTimeout, queueLimit); + + break; + } + catch (IgniteCheckedException e) { + if (e.hasCause(BindException.class)) { + if (i < 2) { + info("Failed to start SPIs because of BindException, will retry after delay."); + + stopSpis(); + + U.sleep(10_000); + } + else + throw e; + } + else + throw e; + } + } + } + + /** + * @throws Exception If failed. + */ + private void stopSpis() throws Exception { + for (CommunicationSpi<Message> spi : spis) { + spi.onContextDestroyed(); + + spi.setListener(null); + + spi.spiStop(); + } + + for (IgniteTestResources rsrcs : spiRsrcs) + rsrcs.stopThreads(); + + spis.clear(); + nodes.clear(); + spiRsrcs.clear(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/32545812/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java index ff86bda..dcb8058 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java @@ -32,6 +32,7 @@ public class IgniteSpiCommunicationSelfTestSuite extends TestSuite { TestSuite suite = new TestSuite("Communication SPI Test Suite"); suite.addTest(new TestSuite(GridTcpCommunicationSpiRecoveryAckSelfTest.class)); + suite.addTest(new TestSuite(GridTcpCommunicationSpiRecoveryAckFutureSelfTest.class)); suite.addTest(new TestSuite(GridTcpCommunicationSpiRecoverySelfTest.class)); suite.addTest(new TestSuite(GridTcpCommunicationSpiConcurrentConnectSelfTest.class));