Repository: camel Updated Branches: refs/heads/camel-2.17.x 38c75c5cc -> 1d3e4a2d6
CAMEL-10024: Cherry picked from master Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1d3e4a2d Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1d3e4a2d Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1d3e4a2d Branch: refs/heads/camel-2.17.x Commit: 1d3e4a2d6dff57b9487574972df8098cf0d2adcd Parents: 38c75c5 Author: Arno Noordover <anoordo...@users.noreply.github.com> Authored: Sat Jun 18 17:11:31 2016 +0200 Committer: Arno Noordover <anoordo...@users.noreply.github.com> Committed: Fri Jun 24 21:06:44 2016 +0200 ---------------------------------------------------------------------- .../camel/component/mina2/Mina2Consumer.java | 6 +- .../camel/component/mina2/Mina2Producer.java | 74 +++++++++++--------- .../component/mina2/Mina2TextLineDelimiter.java | 18 ++++- ...Mina2ClientModeTcpTextlineDelimiterTest.java | 2 +- .../mina2/Mina2DisconnectRaceConditionTest.java | 70 ++++++++++++++++++ .../component/mina2/Mina2EncodingTest.java | 2 +- .../mina2/Mina2ExchangeTimeOutTest.java | 2 +- .../mina2/Mina2NoResponseFromServerTest.java | 4 +- .../mina2/Mina2ProducerShutdownMockTest.java | 7 +- .../mina2/Mina2ReverseProtocolHandler.java | 2 +- .../mina2/Mina2TransferExchangeOptionTest.java | 2 +- 11 files changed, 141 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/1d3e4a2d/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Consumer.java ---------------------------------------------------------------------- diff --git a/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Consumer.java b/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Consumer.java index 2e913e7..ac461da 100644 --- a/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Consumer.java +++ b/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Consumer.java @@ -115,7 +115,7 @@ public class Mina2Consumer extends DefaultConsumer { if (configuration.isClientMode() && configuration.getProtocol().equals("tcp")) { LOG.info("Disconnect from server address: {} using connector: {}", address, connector); if (session != null) { - CloseFuture closeFuture = session.close(true); + CloseFuture closeFuture = session.closeNow(); closeFuture.awaitUninterruptibly(); } connector.dispose(true); @@ -382,7 +382,7 @@ public class Mina2Consumer extends DefaultConsumer { // close invalid session if (session != null) { LOG.warn("Closing session as an exception was thrown from MINA"); - session.close(true); + session.closeNow(); } // must wrap and rethrow since cause can be of Throwable and we must only throw Exception @@ -456,7 +456,7 @@ public class Mina2Consumer extends DefaultConsumer { } if (disconnect) { LOG.debug("Closing session when complete at address: {}", address); - session.close(true); + session.closeNow(); } } } http://git-wip-us.apache.org/repos/asf/camel/blob/1d3e4a2d/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Producer.java ---------------------------------------------------------------------- diff --git a/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Producer.java b/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Producer.java index 132abe3..8e3eaec 100644 --- a/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Producer.java +++ b/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Producer.java @@ -65,8 +65,10 @@ import org.slf4j.LoggerFactory; public class Mina2Producer extends DefaultProducer implements ServicePoolAware { private static final Logger LOG = LoggerFactory.getLogger(Mina2Producer.class); + private final ResponseHandler handler; private IoSession session; - private CountDownLatch latch; + private CountDownLatch responseLatch; + private CountDownLatch closeLatch; private boolean lazySessionCreation; private long timeout; private SocketAddress address; @@ -93,6 +95,8 @@ public class Mina2Producer extends DefaultProducer implements ServicePoolAware { } else if (protocol.equals("vm")) { setupVmProtocol(protocol); } + handler = new ResponseHandler(); + connector.setHandler(handler); } @Override @@ -143,10 +147,9 @@ public class Mina2Producer extends DefaultProducer implements ServicePoolAware { // if sync is true then we should also wait for a response (synchronous mode) if (sync) { - // only initialize latch if we should get a response - latch = new CountDownLatch(1); + // only initialize responseLatch if we should get a response + responseLatch = new CountDownLatch(1); // reset handler if we expect a response - ResponseHandler handler = (ResponseHandler) session.getHandler(); handler.reset(); } @@ -165,13 +168,12 @@ public class Mina2Producer extends DefaultProducer implements ServicePoolAware { if (sync) { // wait for response, consider timeout LOG.debug("Waiting for response using timeout {} millis.", timeout); - boolean done = latch.await(timeout, TimeUnit.MILLISECONDS); + boolean done = responseLatch.await(timeout, TimeUnit.MILLISECONDS); if (!done) { throw new ExchangeTimedOutException(exchange, timeout); } // did we get a response - ResponseHandler handler = (ResponseHandler) session.getHandler(); if (handler.getCause() != null) { throw new CamelExchangeException("Error occurred in ResponseHandler", exchange, handler.getCause()); } else if (!handler.isMessageReceived()) { @@ -188,7 +190,7 @@ public class Mina2Producer extends DefaultProducer implements ServicePoolAware { } } - protected void maybeDisconnectOnDone(Exchange exchange) { + protected void maybeDisconnectOnDone(Exchange exchange) throws InterruptedException { if (session == null) { return; } @@ -208,7 +210,16 @@ public class Mina2Producer extends DefaultProducer implements ServicePoolAware { } if (disconnect) { LOG.debug("Closing session when complete at address: {}", address); - session.close(true); + closeSessionIfNeededAndAwaitCloseInHandler(session); + } + } + + private void closeSessionIfNeededAndAwaitCloseInHandler(IoSession sessionToBeClosed) throws InterruptedException { + closeLatch = new CountDownLatch(1); + if (!sessionToBeClosed.isClosing()) { + CloseFuture closeFuture = sessionToBeClosed.closeNow(); + closeFuture.await(timeout, TimeUnit.MILLISECONDS); + closeLatch.await(timeout, TimeUnit.MILLISECONDS); } } @@ -241,10 +252,9 @@ public class Mina2Producer extends DefaultProducer implements ServicePoolAware { super.doShutdown(); } - private void closeConnection() { + private void closeConnection() throws InterruptedException { if (session != null) { - CloseFuture closeFuture = session.close(true); - closeFuture.awaitUninterruptibly(); + closeSessionIfNeededAndAwaitCloseInHandler(session); } connector.dispose(true); @@ -255,14 +265,13 @@ public class Mina2Producer extends DefaultProducer implements ServicePoolAware { setSocketAddress(this.configuration.getProtocol()); } if (LOG.isDebugEnabled()) { - LOG.debug("Creating connector to address: {} using connector: {} timeout: {} millis.", new Object[]{address, connector, timeout}); + LOG.debug("Creating connector to address: {} using connector: {} timeout: {} millis.", address, connector, timeout); } // connect and wait until the connection is established if (connectorConfig != null) { connector.getSessionConfig().setAll(connectorConfig); } - connector.setHandler(new ResponseHandler()); ConnectFuture future = connector.connect(address); future.awaitUninterruptibly(); session = future.getSession(); @@ -342,7 +351,7 @@ public class Mina2Producer extends DefaultProducer implements ServicePoolAware { } addCodecFactory(service, codecFactory); LOG.debug("{}: Using TextLineCodecFactory: {} using encoding: {} line delimiter: {}({})", - new Object[]{type, codecFactory, charset, configuration.getTextlineDelimiter(), delimiter}); + type, codecFactory, charset, configuration.getTextlineDelimiter(), delimiter); LOG.debug("Encoder maximum line length: {}. Decoder maximum line length: {}", codecFactory.getEncoderMaxLineLength(), codecFactory.getDecoderMaxLineLength()); } else { @@ -412,21 +421,7 @@ public class Mina2Producer extends DefaultProducer implements ServicePoolAware { if (delimiter == null) { return LineDelimiter.DEFAULT; } - - switch (delimiter) { - case DEFAULT: - return LineDelimiter.DEFAULT; - case AUTO: - return LineDelimiter.AUTO; - case UNIX: - return LineDelimiter.UNIX; - case WINDOWS: - return LineDelimiter.WINDOWS; - case MAC: - return LineDelimiter.MAC; - default: - throw new IllegalArgumentException("Unknown textline delimiter: " + delimiter); - } + return delimiter.getLineDelimiter(); } private Charset getEncodingParameter(String type, Mina2Configuration configuration) { @@ -483,11 +478,11 @@ public class Mina2Producer extends DefaultProducer implements ServicePoolAware { this.message = message; messageReceived = true; cause = null; - countDown(); + notifyResultAvailable(); } - protected void countDown() { - CountDownLatch downLatch = latch; + protected void notifyResultAvailable() { + CountDownLatch downLatch = responseLatch; if (downLatch != null) { downLatch.countDown(); } @@ -500,7 +495,14 @@ public class Mina2Producer extends DefaultProducer implements ServicePoolAware { LOG.debug("Session closed but no message received from address: {}", address); // session was closed but no message received. This could be because the remote server had an internal error // and could not return a response. We should count down to stop waiting for a response - countDown(); + notifyResultAvailable(); + } + notifySessionClosed(); + } + + private void notifySessionClosed() { + if (closeLatch != null) { + closeLatch.countDown(); } } @@ -512,7 +514,11 @@ public class Mina2Producer extends DefaultProducer implements ServicePoolAware { this.messageReceived = false; this.cause = cause; if (ioSession != null) { - ioSession.close(true); + try { + closeSessionIfNeededAndAwaitCloseInHandler(ioSession); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } } } http://git-wip-us.apache.org/repos/asf/camel/blob/1d3e4a2d/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2TextLineDelimiter.java ---------------------------------------------------------------------- diff --git a/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2TextLineDelimiter.java b/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2TextLineDelimiter.java index 8bf87c7..bc83a7e 100644 --- a/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2TextLineDelimiter.java +++ b/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2TextLineDelimiter.java @@ -16,10 +16,26 @@ */ package org.apache.camel.component.mina2; +import org.apache.mina.filter.codec.textline.LineDelimiter; + /** * Possible text line delimiters to be used with the textline codec. */ public enum Mina2TextLineDelimiter { - DEFAULT, AUTO, UNIX, WINDOWS, MAC + DEFAULT(LineDelimiter.DEFAULT), + AUTO(LineDelimiter.AUTO), + UNIX(LineDelimiter.UNIX), + WINDOWS(LineDelimiter.WINDOWS), + MAC(LineDelimiter.MAC); + + private final LineDelimiter lineDelimiter; + + Mina2TextLineDelimiter(LineDelimiter lineDelimiter) { + this.lineDelimiter = lineDelimiter; + } + + public LineDelimiter getLineDelimiter() { + return lineDelimiter; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/1d3e4a2d/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ClientModeTcpTextlineDelimiterTest.java ---------------------------------------------------------------------- diff --git a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ClientModeTcpTextlineDelimiterTest.java b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ClientModeTcpTextlineDelimiterTest.java index b160add..62229fc 100644 --- a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ClientModeTcpTextlineDelimiterTest.java +++ b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ClientModeTcpTextlineDelimiterTest.java @@ -85,7 +85,7 @@ public class Mina2ClientModeTcpTextlineDelimiterTest extends BaseMina2Test { private class ServerHandler extends IoHandlerAdapter { public void sessionOpened(IoSession session) throws Exception { session.write("Hello there!\n"); - session.close(true); + session.closeNow(); } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/1d3e4a2d/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2DisconnectRaceConditionTest.java ---------------------------------------------------------------------- diff --git a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2DisconnectRaceConditionTest.java b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2DisconnectRaceConditionTest.java new file mode 100644 index 0000000..6b4b353 --- /dev/null +++ b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2DisconnectRaceConditionTest.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.mina2; + +import java.lang.reflect.Field; + +import org.apache.camel.Exchange; +import org.apache.camel.ExchangePattern; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.impl.DefaultExchange; +import org.apache.mina.core.session.IoSession; +import org.junit.Test; + +public class Mina2DisconnectRaceConditionTest extends BaseMina2Test { + + /** + * This is a test for issue CAMEL-10024 - the closing must complete before we return from the producer + * + * @throws Exception + */ + @Test + public void testCloseSessionWhenCompleteManyTimes() throws Exception { + final String endpointUri = String.format("mina2:tcp://localhost:%1$s?sync=true&textline=true&disconnect=true&minaLogger=true", getPort()); + Mina2Producer producer = (Mina2Producer) context.getEndpoint(endpointUri).createProducer(); + // Access session to check that the session is really closed + Field field = producer.getClass().getDeclaredField("session"); + field.setAccessible(true); + + for (int i = 0; i < 100; i++) { + Exchange e = new DefaultExchange(context, ExchangePattern.InOut); + e.getIn().setBody("Chad"); + producer.process(e); + final IoSession ioSession = (IoSession) field.get(producer); + assertTrue(ioSession.getCloseFuture().isDone()); + Object out = e.getOut().getBody(); + assertEquals("Bye Chad", out); + } + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + + public void configure() throws Exception { + from(String.format("mina2:tcp://localhost:%1$s?sync=true&textline=true", getPort())).process(new Processor() { + + public void process(Exchange exchange) throws Exception { + String body = exchange.getIn().getBody(String.class); + exchange.getOut().setBody("Bye " + body); + } + }); + } + }; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/1d3e4a2d/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2EncodingTest.java ---------------------------------------------------------------------- diff --git a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2EncodingTest.java b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2EncodingTest.java index f6130c1..72d4994 100644 --- a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2EncodingTest.java +++ b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2EncodingTest.java @@ -170,7 +170,7 @@ public class Mina2EncodingTest extends BaseMina2Test { Endpoint endpoint = context.getEndpoint(uri); Producer producer = endpoint.createProducer(); - Exchange exchange = producer.createExchange(); + Exchange exchange = endpoint.createExchange(); exchange.getIn().setBody(hello); producer.start(); http://git-wip-us.apache.org/repos/asf/camel/blob/1d3e4a2d/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ExchangeTimeOutTest.java ---------------------------------------------------------------------- diff --git a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ExchangeTimeOutTest.java b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ExchangeTimeOutTest.java index 512c1f6..d24743f 100644 --- a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ExchangeTimeOutTest.java +++ b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ExchangeTimeOutTest.java @@ -35,7 +35,7 @@ public class Mina2ExchangeTimeOutTest extends BaseMina2Test { Endpoint endpoint = context.getEndpoint(String.format("mina2:tcp://localhost:%1$s?textline=true&sync=true&timeout=500", getPort())); Producer producer = endpoint.createProducer(); producer.start(); - Exchange exchange = producer.createExchange(); + Exchange exchange = endpoint.createExchange(); exchange.getIn().setBody("Hello World"); try { producer.process(exchange); http://git-wip-us.apache.org/repos/asf/camel/blob/1d3e4a2d/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2NoResponseFromServerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2NoResponseFromServerTest.java b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2NoResponseFromServerTest.java index 25b82dd..7870efc 100644 --- a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2NoResponseFromServerTest.java +++ b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2NoResponseFromServerTest.java @@ -73,7 +73,7 @@ public class Mina2NoResponseFromServerTest extends BaseMina2Test { public void encode(IoSession ioSession, Object message, ProtocolEncoderOutput out) throws Exception { // close session instead of returning a reply - ioSession.close(true); + ioSession.closeNow(); } public void dispose(IoSession ioSession) throws Exception { @@ -89,7 +89,7 @@ public class Mina2NoResponseFromServerTest extends BaseMina2Test { public void decode(IoSession ioSession, IoBuffer in, ProtocolDecoderOutput out) throws Exception { // close session instead of returning a reply - ioSession.close(true); + ioSession.closeNow(); } public void finishDecode(IoSession ioSession, ProtocolDecoderOutput protocolDecoderOutput) http://git-wip-us.apache.org/repos/asf/camel/blob/1d3e4a2d/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ProducerShutdownMockTest.java ---------------------------------------------------------------------- diff --git a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ProducerShutdownMockTest.java b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ProducerShutdownMockTest.java index c36bade..ab110a0 100644 --- a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ProducerShutdownMockTest.java +++ b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ProducerShutdownMockTest.java @@ -27,9 +27,10 @@ import org.apache.camel.component.mock.MockEndpoint; import org.apache.mina.transport.socket.SocketConnector; import org.junit.Test; -import static org.easymock.classextension.EasyMock.createMock; -import static org.easymock.classextension.EasyMock.replay; -import static org.easymock.classextension.EasyMock.verify; +import static org.easymock.EasyMock.createMock; +import static org.easymock.EasyMock.replay; +import static org.easymock.EasyMock.verify; + /** * Unit testing for using a MinaProducer that it can shutdown properly (CAMEL-395) http://git-wip-us.apache.org/repos/asf/camel/blob/1d3e4a2d/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ReverseProtocolHandler.java ---------------------------------------------------------------------- diff --git a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ReverseProtocolHandler.java b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ReverseProtocolHandler.java index 04579fa..6267dc0 100644 --- a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ReverseProtocolHandler.java +++ b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ReverseProtocolHandler.java @@ -30,7 +30,7 @@ public class Mina2ReverseProtocolHandler extends IoHandlerAdapter { public void exceptionCaught(IoSession session, Throwable cause) { cause.printStackTrace(); // Close connection when unexpected exception is caught. - session.close(true); + session.closeNow(); } public void messageReceived(IoSession session, Object message) { http://git-wip-us.apache.org/repos/asf/camel/blob/1d3e4a2d/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2TransferExchangeOptionTest.java ---------------------------------------------------------------------- diff --git a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2TransferExchangeOptionTest.java b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2TransferExchangeOptionTest.java index 282d471..74b5683 100644 --- a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2TransferExchangeOptionTest.java +++ b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2TransferExchangeOptionTest.java @@ -53,7 +53,7 @@ public class Mina2TransferExchangeOptionTest extends BaseMina2Test { private Exchange sendExchange(boolean setException) throws Exception { Endpoint endpoint = context.getEndpoint(String.format("mina2:tcp://localhost:%1$s?sync=true&encoding=UTF-8&transferExchange=true", getPort())); Producer producer = endpoint.createProducer(); - Exchange exchange = producer.createExchange(); + Exchange exchange = endpoint.createExchange(); //Exchange exchange = endpoint.createExchange(); Message message = exchange.getIn();