This is an automated email from the ASF dual-hosted git repository. johnnyv pushed a commit to branch bugfix/DIRMINA-1142 in repository https://gitbox.apache.org/repos/asf/mina.git
commit eb3a160febd4db7d4f07cde7fb8727adb984ebdb Author: Jonathan Valliere <john...@apache.org> AuthorDate: Tue May 18 13:31:40 2021 -0400 Adds unit test for DIRMINA-1142 --- .../filter/codec/ParallelProtocolEncoderTest.java | 184 +++++++++++++++++++++ 1 file changed, 184 insertions(+) diff --git a/mina-core/src/test/java/org/apache/mina/filter/codec/ParallelProtocolEncoderTest.java b/mina-core/src/test/java/org/apache/mina/filter/codec/ParallelProtocolEncoderTest.java new file mode 100644 index 0000000..905dcf1 --- /dev/null +++ b/mina-core/src/test/java/org/apache/mina/filter/codec/ParallelProtocolEncoderTest.java @@ -0,0 +1,184 @@ +package org.apache.mina.filter.codec; + +import static org.junit.Assert.assertTrue; + +import java.net.InetSocketAddress; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.mina.core.future.ConnectFuture; +import org.apache.mina.core.future.IoFutureListener; +import org.apache.mina.core.future.WriteFuture; +import org.apache.mina.core.service.IoHandlerAdapter; +import org.apache.mina.core.session.IoSession; +import org.apache.mina.filter.codec.serialization.ObjectSerializationCodecFactory; +import org.apache.mina.filter.codec.textline.TextLineCodecFactory; +import org.apache.mina.transport.socket.nio.NioSocketAcceptor; +import org.apache.mina.transport.socket.nio.NioSocketConnector; +import org.junit.Test; + +public class ParallelProtocolEncoderTest { + private NioSocketConnector connector = null; + private NioSocketAcceptor acceptor = null; + private static int LOOP = 1000; + private static int THREAD = 3; + + private static Logger logger = LogManager.getLogger(ParallelProtocolEncoderTest.class); + private static ExecutorService executorService = Executors.newFixedThreadPool(THREAD); + + @Test + public void missingMessageTest() throws Exception { + String host = "localhost"; + int port = 28_000; + + // server + acceptor = new NioSocketAcceptor(); + acceptor.getFilterChain().addFirst("codec", new ProtocolCodecFilter(new ObjectSerializationCodecFactory())); + ServerHandler serverHandler = new ServerHandler(); + acceptor.setHandler(serverHandler); + acceptor.bind(new InetSocketAddress(host, port)); + + // client + connector = new NioSocketConnector(1); + connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new ObjectSerializationCodecFactory())); + ClientHandler clientHandler = new ClientHandler(); + connector.setHandler(clientHandler); + ConnectFuture connectFuture = connector.connect(new InetSocketAddress(host, 28_000)); + connectFuture.awaitUninterruptibly(); + + final IoSession ioSession = connectFuture.getSession(); + + logger.info("missingMessageTest.begin with " + LOOP + " messages and " + THREAD + " threads"); + + for (int i = 1; i <= LOOP; i++) { + final String message = "Message:" + i; + executorService.submit(new Runnable() { + + @Override + public void run() { + + logger.info("missingMessageTest.client.write "+message); + + final WriteFuture future = ioSession.write(message); + if (future != null) { + future.addListener(new IoFutureListener<WriteFuture>() { + @Override + public void operationComplete(WriteFuture writeFuture) { + if (!future.isWritten()) { + logger.error("writeFuture: " + writeFuture.getException()); + } + } + }); + } + } + }); + + } + logger.info("missingMessageTest.end"); + + int maxSleep = 5_000; + int time = 1000; + int sleep = 0; + while ((!clientHandler.isFinished() || !serverHandler.isFinished()) && maxSleep > sleep) { + sleep += time; + logger.info("missingMessageTest.sleep... " + sleep); + Thread.sleep(time); + } + + logger.info("missingMessageTest.close"); + + ioSession.closeNow(); + connector.dispose(); + acceptor.dispose(); + + if (!serverHandler.isFinished()) { + Set<String> missingMessages = clientHandler.getMessages(); + missingMessages.removeAll(serverHandler.getMessages()); + logger.error("missing <" + missingMessages.size() + "> messages : " + missingMessages); + } + + assertTrue(serverHandler.isFinished()); + assertTrue(clientHandler.isFinished()); + } + + private static class ServerHandler extends IoHandlerAdapter { + private Set<String> messages = new HashSet<>(LOOP); + private AtomicInteger count = new AtomicInteger(0); + + @Override + public void messageReceived(IoSession session, Object message) throws Exception { + + String messageString = (String) message; + count.incrementAndGet(); + + if (messages.contains(messageString)) { + logger.error("messageReceived: message <" + messageString + "> already received"); + } + messages.add(messageString); + + // logger.info("messageReceived: <"+message+">, count="+count); + + if (isFinished()) { + logger.info("messageReceived: finish"); + } + + super.messageReceived(session, message); + } + + public boolean isFinished() { + return count.get() == LOOP; + } + + /** + * Get the messages. + * + * @return the messages + */ + public Set<String> getMessages() { + return messages; + } + } + + private static class ClientHandler extends IoHandlerAdapter { + private Set<String> messages = new HashSet<>(LOOP); + private AtomicInteger count = new AtomicInteger(0); + + @Override + public void messageSent(IoSession session, Object message) throws Exception { + + logger.info("messageSent " + message); + + count.incrementAndGet(); + String messageString = (String) message; + if (messages.contains(messageString)) { + logger.error("messageSent: message <" + messageString + "> already sent"); + } + messages.add(messageString); + + // logger.info("messageSent: <"+message+">, count="+count); + + if (isFinished()) { + logger.info("messageSent: finish"); + } + super.messageSent(session, message); + } + + public boolean isFinished() { + return count.get() == LOOP; + } + + /** + * Get the messages. + * + * @return the messages + */ + public Set<String> getMessages() { + return messages; + } + } +}