Repository: camel Updated Branches: refs/heads/master 8f2bc1514 -> 76544116f
CAMEL-10024: sync on close and deprecation Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/fe41b1bb Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/fe41b1bb Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/fe41b1bb Branch: refs/heads/master Commit: fe41b1bb9ac743214a6f6baba393a251e55037fc Parents: e784761 Author: Arno Noordover <anoordo...@users.noreply.github.com> Authored: Fri Jun 17 16:47:14 2016 +0200 Committer: Arno Noordover <anoordo...@users.noreply.github.com> Committed: Fri Jun 17 16:47:14 2016 +0200 ---------------------------------------------------------------------- .../camel/component/mina2/Mina2Consumer.java | 6 +- .../camel/component/mina2/Mina2Producer.java | 36 +++++++--- ...Mina2ClientModeTcpTextlineDelimiterTest.java | 2 +- .../mina2/Mina2DisconnectRaceConditionTest.java | 70 ++++++++++++++++++++ .../component/mina2/Mina2EncodingTest.java | 2 +- .../mina2/Mina2ExchangeTimeOutTest.java | 2 +- .../mina2/Mina2NoResponseFromServerTest.java | 4 +- .../mina2/Mina2ProducerShutdownMockTest.java | 7 +- .../mina2/Mina2ReverseProtocolHandler.java | 2 +- .../mina2/Mina2TransferExchangeOptionTest.java | 2 +- 10 files changed, 109 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/fe41b1bb/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Consumer.java ---------------------------------------------------------------------- diff --git a/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Consumer.java b/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Consumer.java index c2a7c50..a916bab 100644 --- a/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Consumer.java +++ b/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Consumer.java @@ -115,7 +115,7 @@ public class Mina2Consumer extends DefaultConsumer { if (configuration.isClientMode() && configuration.getProtocol().equals("tcp")) { LOG.info("Disconnect from server address: {} using connector: {}", address, connector); if (session != null) { - CloseFuture closeFuture = session.close(true); + CloseFuture closeFuture = session.closeNow(); closeFuture.awaitUninterruptibly(); } connector.dispose(true); @@ -382,7 +382,7 @@ public class Mina2Consumer extends DefaultConsumer { // close invalid session if (session != null) { LOG.warn("Closing session as an exception was thrown from MINA"); - session.close(true); + session.closeNow(); } // must wrap and rethrow since cause can be of Throwable and we must only throw Exception @@ -456,7 +456,7 @@ public class Mina2Consumer extends DefaultConsumer { } if (disconnect) { LOG.debug("Closing session when complete at address: {}", address); - session.close(true); + session.closeNow(); } } } http://git-wip-us.apache.org/repos/asf/camel/blob/fe41b1bb/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Producer.java ---------------------------------------------------------------------- diff --git a/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Producer.java b/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Producer.java index fdd02dc..4337075 100644 --- a/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Producer.java +++ b/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Producer.java @@ -65,6 +65,7 @@ import org.slf4j.LoggerFactory; public class Mina2Producer extends DefaultProducer implements ServicePoolAware { private static final Logger LOG = LoggerFactory.getLogger(Mina2Producer.class); + private final ResponseHandler handler; private IoSession session; private CountDownLatch latch; private boolean lazySessionCreation; @@ -76,6 +77,7 @@ public class Mina2Producer extends DefaultProducer implements ServicePoolAware { private Mina2Configuration configuration; private IoSessionConfig connectorConfig; private ExecutorService workerPool; + private CountDownLatch closeLatch; public Mina2Producer(Mina2Endpoint endpoint) throws Exception { super(endpoint); @@ -93,6 +95,8 @@ public class Mina2Producer extends DefaultProducer implements ServicePoolAware { } else if (protocol.equals("vm")) { setupVmProtocol(protocol); } + handler = new ResponseHandler(); + connector.setHandler(handler); } @Override @@ -146,7 +150,6 @@ public class Mina2Producer extends DefaultProducer implements ServicePoolAware { // only initialize latch if we should get a response latch = new CountDownLatch(1); // reset handler if we expect a response - ResponseHandler handler = (ResponseHandler) session.getHandler(); handler.reset(); } @@ -171,7 +174,6 @@ public class Mina2Producer extends DefaultProducer implements ServicePoolAware { } // did we get a response - ResponseHandler handler = (ResponseHandler) session.getHandler(); if (handler.getCause() != null) { throw new CamelExchangeException("Error occurred in ResponseHandler", exchange, handler.getCause()); } else if (!handler.isMessageReceived()) { @@ -188,7 +190,7 @@ public class Mina2Producer extends DefaultProducer implements ServicePoolAware { } } - protected void maybeDisconnectOnDone(Exchange exchange) { + protected void maybeDisconnectOnDone(Exchange exchange) throws InterruptedException { if (session == null) { return; } @@ -208,7 +210,16 @@ public class Mina2Producer extends DefaultProducer implements ServicePoolAware { } if (disconnect) { LOG.debug("Closing session when complete at address: {}", address); - session.close(true); + closeSessionIfNeededAndAwaitCloseInHandler(session); + } + } + + private void closeSessionIfNeededAndAwaitCloseInHandler(IoSession sessionToBeClosed) throws InterruptedException { + closeLatch = new CountDownLatch(1); + if (!sessionToBeClosed.isClosing()) { + CloseFuture closeFuture = sessionToBeClosed.closeNow(); + closeFuture.await(timeout, TimeUnit.MILLISECONDS); + closeLatch.await(timeout, TimeUnit.MILLISECONDS); } } @@ -241,10 +252,9 @@ public class Mina2Producer extends DefaultProducer implements ServicePoolAware { super.doShutdown(); } - private void closeConnection() { + private void closeConnection() throws InterruptedException { if (session != null) { - CloseFuture closeFuture = session.close(true); - closeFuture.awaitUninterruptibly(); + closeSessionIfNeededAndAwaitCloseInHandler(session); } connector.dispose(true); @@ -255,14 +265,13 @@ public class Mina2Producer extends DefaultProducer implements ServicePoolAware { setSocketAddress(this.configuration.getProtocol()); } if (LOG.isDebugEnabled()) { - LOG.debug("Creating connector to address: {} using connector: {} timeout: {} millis.", new Object[]{address, connector, timeout}); + LOG.debug("Creating connector to address: {} using connector: {} timeout: {} millis.", address, connector, timeout); } // connect and wait until the connection is established if (connectorConfig != null) { connector.getSessionConfig().setAll(connectorConfig); } - connector.setHandler(new ResponseHandler()); ConnectFuture future = connector.connect(address); future.awaitUninterruptibly(); session = future.getSession(); @@ -342,7 +351,7 @@ public class Mina2Producer extends DefaultProducer implements ServicePoolAware { } addCodecFactory(service, codecFactory); LOG.debug("{}: Using TextLineCodecFactory: {} using encoding: {} line delimiter: {}({})", - new Object[]{type, codecFactory, charset, configuration.getTextlineDelimiter(), delimiter}); + type, codecFactory, charset, configuration.getTextlineDelimiter(), delimiter); LOG.debug("Encoder maximum line length: {}. Decoder maximum line length: {}", codecFactory.getEncoderMaxLineLength(), codecFactory.getDecoderMaxLineLength()); } else { @@ -502,6 +511,7 @@ public class Mina2Producer extends DefaultProducer implements ServicePoolAware { // and could not return a response. We should count down to stop waiting for a response countDown(); } + closeLatch.countDown(); } @Override @@ -512,7 +522,11 @@ public class Mina2Producer extends DefaultProducer implements ServicePoolAware { this.messageReceived = false; this.cause = cause; if (ioSession != null) { - ioSession.close(true); + try { + closeSessionIfNeededAndAwaitCloseInHandler(ioSession); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } } } http://git-wip-us.apache.org/repos/asf/camel/blob/fe41b1bb/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ClientModeTcpTextlineDelimiterTest.java ---------------------------------------------------------------------- diff --git a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ClientModeTcpTextlineDelimiterTest.java b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ClientModeTcpTextlineDelimiterTest.java index ba11d11..3f7bb47 100644 --- a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ClientModeTcpTextlineDelimiterTest.java +++ b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ClientModeTcpTextlineDelimiterTest.java @@ -86,7 +86,7 @@ public class Mina2ClientModeTcpTextlineDelimiterTest extends BaseMina2Test { private class ServerHandler extends IoHandlerAdapter { public void sessionOpened(IoSession session) throws Exception { session.write("Hello there!\n"); - session.close(true); + session.closeNow(); } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/fe41b1bb/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2DisconnectRaceConditionTest.java ---------------------------------------------------------------------- diff --git a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2DisconnectRaceConditionTest.java b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2DisconnectRaceConditionTest.java new file mode 100644 index 0000000..6b4b353 --- /dev/null +++ b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2DisconnectRaceConditionTest.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.mina2; + +import java.lang.reflect.Field; + +import org.apache.camel.Exchange; +import org.apache.camel.ExchangePattern; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.impl.DefaultExchange; +import org.apache.mina.core.session.IoSession; +import org.junit.Test; + +public class Mina2DisconnectRaceConditionTest extends BaseMina2Test { + + /** + * This is a test for issue CAMEL-10024 - the closing must complete before we return from the producer + * + * @throws Exception + */ + @Test + public void testCloseSessionWhenCompleteManyTimes() throws Exception { + final String endpointUri = String.format("mina2:tcp://localhost:%1$s?sync=true&textline=true&disconnect=true&minaLogger=true", getPort()); + Mina2Producer producer = (Mina2Producer) context.getEndpoint(endpointUri).createProducer(); + // Access session to check that the session is really closed + Field field = producer.getClass().getDeclaredField("session"); + field.setAccessible(true); + + for (int i = 0; i < 100; i++) { + Exchange e = new DefaultExchange(context, ExchangePattern.InOut); + e.getIn().setBody("Chad"); + producer.process(e); + final IoSession ioSession = (IoSession) field.get(producer); + assertTrue(ioSession.getCloseFuture().isDone()); + Object out = e.getOut().getBody(); + assertEquals("Bye Chad", out); + } + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + + public void configure() throws Exception { + from(String.format("mina2:tcp://localhost:%1$s?sync=true&textline=true", getPort())).process(new Processor() { + + public void process(Exchange exchange) throws Exception { + String body = exchange.getIn().getBody(String.class); + exchange.getOut().setBody("Bye " + body); + } + }); + } + }; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/fe41b1bb/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2EncodingTest.java ---------------------------------------------------------------------- diff --git a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2EncodingTest.java b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2EncodingTest.java index 63328d5..17a120f 100644 --- a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2EncodingTest.java +++ b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2EncodingTest.java @@ -171,7 +171,7 @@ public class Mina2EncodingTest extends BaseMina2Test { Endpoint endpoint = context.getEndpoint(uri); Producer producer = endpoint.createProducer(); - Exchange exchange = producer.createExchange(); + Exchange exchange = endpoint.createExchange(); exchange.getIn().setBody(hello); producer.start(); http://git-wip-us.apache.org/repos/asf/camel/blob/fe41b1bb/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ExchangeTimeOutTest.java ---------------------------------------------------------------------- diff --git a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ExchangeTimeOutTest.java b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ExchangeTimeOutTest.java index 512c1f6..d24743f 100644 --- a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ExchangeTimeOutTest.java +++ b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ExchangeTimeOutTest.java @@ -35,7 +35,7 @@ public class Mina2ExchangeTimeOutTest extends BaseMina2Test { Endpoint endpoint = context.getEndpoint(String.format("mina2:tcp://localhost:%1$s?textline=true&sync=true&timeout=500", getPort())); Producer producer = endpoint.createProducer(); producer.start(); - Exchange exchange = producer.createExchange(); + Exchange exchange = endpoint.createExchange(); exchange.getIn().setBody("Hello World"); try { producer.process(exchange); http://git-wip-us.apache.org/repos/asf/camel/blob/fe41b1bb/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2NoResponseFromServerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2NoResponseFromServerTest.java b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2NoResponseFromServerTest.java index 25b82dd..7870efc 100644 --- a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2NoResponseFromServerTest.java +++ b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2NoResponseFromServerTest.java @@ -73,7 +73,7 @@ public class Mina2NoResponseFromServerTest extends BaseMina2Test { public void encode(IoSession ioSession, Object message, ProtocolEncoderOutput out) throws Exception { // close session instead of returning a reply - ioSession.close(true); + ioSession.closeNow(); } public void dispose(IoSession ioSession) throws Exception { @@ -89,7 +89,7 @@ public class Mina2NoResponseFromServerTest extends BaseMina2Test { public void decode(IoSession ioSession, IoBuffer in, ProtocolDecoderOutput out) throws Exception { // close session instead of returning a reply - ioSession.close(true); + ioSession.closeNow(); } public void finishDecode(IoSession ioSession, ProtocolDecoderOutput protocolDecoderOutput) http://git-wip-us.apache.org/repos/asf/camel/blob/fe41b1bb/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ProducerShutdownMockTest.java ---------------------------------------------------------------------- diff --git a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ProducerShutdownMockTest.java b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ProducerShutdownMockTest.java index c36bade..ab110a0 100644 --- a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ProducerShutdownMockTest.java +++ b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ProducerShutdownMockTest.java @@ -27,9 +27,10 @@ import org.apache.camel.component.mock.MockEndpoint; import org.apache.mina.transport.socket.SocketConnector; import org.junit.Test; -import static org.easymock.classextension.EasyMock.createMock; -import static org.easymock.classextension.EasyMock.replay; -import static org.easymock.classextension.EasyMock.verify; +import static org.easymock.EasyMock.createMock; +import static org.easymock.EasyMock.replay; +import static org.easymock.EasyMock.verify; + /** * Unit testing for using a MinaProducer that it can shutdown properly (CAMEL-395) http://git-wip-us.apache.org/repos/asf/camel/blob/fe41b1bb/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ReverseProtocolHandler.java ---------------------------------------------------------------------- diff --git a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ReverseProtocolHandler.java b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ReverseProtocolHandler.java index 04579fa..6267dc0 100644 --- a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ReverseProtocolHandler.java +++ b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ReverseProtocolHandler.java @@ -30,7 +30,7 @@ public class Mina2ReverseProtocolHandler extends IoHandlerAdapter { public void exceptionCaught(IoSession session, Throwable cause) { cause.printStackTrace(); // Close connection when unexpected exception is caught. - session.close(true); + session.closeNow(); } public void messageReceived(IoSession session, Object message) { http://git-wip-us.apache.org/repos/asf/camel/blob/fe41b1bb/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2TransferExchangeOptionTest.java ---------------------------------------------------------------------- diff --git a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2TransferExchangeOptionTest.java b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2TransferExchangeOptionTest.java index 282d471..74b5683 100644 --- a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2TransferExchangeOptionTest.java +++ b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2TransferExchangeOptionTest.java @@ -53,7 +53,7 @@ public class Mina2TransferExchangeOptionTest extends BaseMina2Test { private Exchange sendExchange(boolean setException) throws Exception { Endpoint endpoint = context.getEndpoint(String.format("mina2:tcp://localhost:%1$s?sync=true&encoding=UTF-8&transferExchange=true", getPort())); Producer producer = endpoint.createProducer(); - Exchange exchange = producer.createExchange(); + Exchange exchange = endpoint.createExchange(); //Exchange exchange = endpoint.createExchange(); Message message = exchange.getIn();