Repository: camel
Updated Branches:
  refs/heads/master 8f2bc1514 -> 76544116f


CAMEL-10024: sync on close and deprecation


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/fe41b1bb
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/fe41b1bb
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/fe41b1bb

Branch: refs/heads/master
Commit: fe41b1bb9ac743214a6f6baba393a251e55037fc
Parents: e784761
Author: Arno Noordover <anoordo...@users.noreply.github.com>
Authored: Fri Jun 17 16:47:14 2016 +0200
Committer: Arno Noordover <anoordo...@users.noreply.github.com>
Committed: Fri Jun 17 16:47:14 2016 +0200

----------------------------------------------------------------------
 .../camel/component/mina2/Mina2Consumer.java    |  6 +-
 .../camel/component/mina2/Mina2Producer.java    | 36 +++++++---
 ...Mina2ClientModeTcpTextlineDelimiterTest.java |  2 +-
 .../mina2/Mina2DisconnectRaceConditionTest.java | 70 ++++++++++++++++++++
 .../component/mina2/Mina2EncodingTest.java      |  2 +-
 .../mina2/Mina2ExchangeTimeOutTest.java         |  2 +-
 .../mina2/Mina2NoResponseFromServerTest.java    |  4 +-
 .../mina2/Mina2ProducerShutdownMockTest.java    |  7 +-
 .../mina2/Mina2ReverseProtocolHandler.java      |  2 +-
 .../mina2/Mina2TransferExchangeOptionTest.java  |  2 +-
 10 files changed, 109 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/fe41b1bb/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Consumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Consumer.java
 
b/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Consumer.java
index c2a7c50..a916bab 100644
--- 
a/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Consumer.java
+++ 
b/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Consumer.java
@@ -115,7 +115,7 @@ public class Mina2Consumer extends DefaultConsumer {
         if (configuration.isClientMode() && 
configuration.getProtocol().equals("tcp")) {
             LOG.info("Disconnect from server address: {} using connector: {}", 
address, connector);
             if (session != null) {
-                CloseFuture closeFuture = session.close(true);
+                CloseFuture closeFuture = session.closeNow();
                 closeFuture.awaitUninterruptibly();
             }
             connector.dispose(true);
@@ -382,7 +382,7 @@ public class Mina2Consumer extends DefaultConsumer {
             // close invalid session
             if (session != null) {
                 LOG.warn("Closing session as an exception was thrown from 
MINA");
-                session.close(true);
+                session.closeNow();
             }
 
             // must wrap and rethrow since cause can be of Throwable and we 
must only throw Exception
@@ -456,7 +456,7 @@ public class Mina2Consumer extends DefaultConsumer {
             }
             if (disconnect) {
                 LOG.debug("Closing session when complete at address: {}", 
address);
-                session.close(true);
+                session.closeNow();
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/fe41b1bb/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Producer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Producer.java
 
b/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Producer.java
index fdd02dc..4337075 100644
--- 
a/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Producer.java
+++ 
b/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Producer.java
@@ -65,6 +65,7 @@ import org.slf4j.LoggerFactory;
 public class Mina2Producer extends DefaultProducer implements ServicePoolAware 
{
 
     private static final Logger LOG = 
LoggerFactory.getLogger(Mina2Producer.class);
+    private final ResponseHandler handler;
     private IoSession session;
     private CountDownLatch latch;
     private boolean lazySessionCreation;
@@ -76,6 +77,7 @@ public class Mina2Producer extends DefaultProducer implements 
ServicePoolAware {
     private Mina2Configuration configuration;
     private IoSessionConfig connectorConfig;
     private ExecutorService workerPool;
+    private CountDownLatch closeLatch;
 
     public Mina2Producer(Mina2Endpoint endpoint) throws Exception {
         super(endpoint);
@@ -93,6 +95,8 @@ public class Mina2Producer extends DefaultProducer implements 
ServicePoolAware {
         } else if (protocol.equals("vm")) {
             setupVmProtocol(protocol);
         }
+        handler = new ResponseHandler();
+        connector.setHandler(handler);
     }
 
     @Override
@@ -146,7 +150,6 @@ public class Mina2Producer extends DefaultProducer 
implements ServicePoolAware {
             // only initialize latch if we should get a response
             latch = new CountDownLatch(1);
             // reset handler if we expect a response
-            ResponseHandler handler = (ResponseHandler) session.getHandler();
             handler.reset();
         }
 
@@ -171,7 +174,6 @@ public class Mina2Producer extends DefaultProducer 
implements ServicePoolAware {
             }
 
             // did we get a response
-            ResponseHandler handler = (ResponseHandler) session.getHandler();
             if (handler.getCause() != null) {
                 throw new CamelExchangeException("Error occurred in 
ResponseHandler", exchange, handler.getCause());
             } else if (!handler.isMessageReceived()) {
@@ -188,7 +190,7 @@ public class Mina2Producer extends DefaultProducer 
implements ServicePoolAware {
         }
     }
 
-    protected void maybeDisconnectOnDone(Exchange exchange) {
+    protected void maybeDisconnectOnDone(Exchange exchange) throws 
InterruptedException {
         if (session == null) {
             return;
         }
@@ -208,7 +210,16 @@ public class Mina2Producer extends DefaultProducer 
implements ServicePoolAware {
         }
         if (disconnect) {
             LOG.debug("Closing session when complete at address: {}", address);
-            session.close(true);
+            closeSessionIfNeededAndAwaitCloseInHandler(session);
+        }
+    }
+
+    private void closeSessionIfNeededAndAwaitCloseInHandler(IoSession 
sessionToBeClosed) throws InterruptedException {
+        closeLatch = new CountDownLatch(1);
+        if (!sessionToBeClosed.isClosing()) {
+            CloseFuture closeFuture = sessionToBeClosed.closeNow();
+            closeFuture.await(timeout, TimeUnit.MILLISECONDS);
+            closeLatch.await(timeout, TimeUnit.MILLISECONDS);
         }
     }
 
@@ -241,10 +252,9 @@ public class Mina2Producer extends DefaultProducer 
implements ServicePoolAware {
         super.doShutdown();
     }
 
-    private void closeConnection() {
+    private void closeConnection() throws InterruptedException {
         if (session != null) {
-            CloseFuture closeFuture = session.close(true);
-            closeFuture.awaitUninterruptibly();
+            closeSessionIfNeededAndAwaitCloseInHandler(session);
         }
 
         connector.dispose(true);
@@ -255,14 +265,13 @@ public class Mina2Producer extends DefaultProducer 
implements ServicePoolAware {
             setSocketAddress(this.configuration.getProtocol());
         }
         if (LOG.isDebugEnabled()) {
-            LOG.debug("Creating connector to address: {} using connector: {} 
timeout: {} millis.", new Object[]{address, connector, timeout});
+            LOG.debug("Creating connector to address: {} using connector: {} 
timeout: {} millis.", address, connector, timeout);
         }
         // connect and wait until the connection is established
         if (connectorConfig != null) {
             connector.getSessionConfig().setAll(connectorConfig);
         }
 
-        connector.setHandler(new ResponseHandler());
         ConnectFuture future = connector.connect(address);
         future.awaitUninterruptibly();
         session = future.getSession();
@@ -342,7 +351,7 @@ public class Mina2Producer extends DefaultProducer 
implements ServicePoolAware {
             }
             addCodecFactory(service, codecFactory);
             LOG.debug("{}: Using TextLineCodecFactory: {} using encoding: {} 
line delimiter: {}({})",
-                      new Object[]{type, codecFactory, charset, 
configuration.getTextlineDelimiter(), delimiter});
+                    type, codecFactory, charset, 
configuration.getTextlineDelimiter(), delimiter);
             LOG.debug("Encoder maximum line length: {}. Decoder maximum line 
length: {}",
                     codecFactory.getEncoderMaxLineLength(), 
codecFactory.getDecoderMaxLineLength());
         } else {
@@ -502,6 +511,7 @@ public class Mina2Producer extends DefaultProducer 
implements ServicePoolAware {
                 // and could not return a response. We should count down to 
stop waiting for a response
                 countDown();
             }
+            closeLatch.countDown();
         }
 
         @Override
@@ -512,7 +522,11 @@ public class Mina2Producer extends DefaultProducer 
implements ServicePoolAware {
             this.messageReceived = false;
             this.cause = cause;
             if (ioSession != null) {
-                ioSession.close(true);
+                try {
+                    closeSessionIfNeededAndAwaitCloseInHandler(ioSession);
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                }
             }
         }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/fe41b1bb/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ClientModeTcpTextlineDelimiterTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ClientModeTcpTextlineDelimiterTest.java
 
b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ClientModeTcpTextlineDelimiterTest.java
index ba11d11..3f7bb47 100644
--- 
a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ClientModeTcpTextlineDelimiterTest.java
+++ 
b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ClientModeTcpTextlineDelimiterTest.java
@@ -86,7 +86,7 @@ public class Mina2ClientModeTcpTextlineDelimiterTest extends 
BaseMina2Test {
     private class ServerHandler extends IoHandlerAdapter {
         public void sessionOpened(IoSession session) throws Exception {
             session.write("Hello there!\n");
-            session.close(true);
+            session.closeNow();
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/fe41b1bb/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2DisconnectRaceConditionTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2DisconnectRaceConditionTest.java
 
b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2DisconnectRaceConditionTest.java
new file mode 100644
index 0000000..6b4b353
--- /dev/null
+++ 
b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2DisconnectRaceConditionTest.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mina2;
+
+import java.lang.reflect.Field;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.DefaultExchange;
+import org.apache.mina.core.session.IoSession;
+import org.junit.Test;
+
+public class Mina2DisconnectRaceConditionTest extends BaseMina2Test {
+
+    /**
+     * This is a test for issue CAMEL-10024 - the closing must complete before 
we return from the producer
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testCloseSessionWhenCompleteManyTimes() throws Exception {
+        final String endpointUri = 
String.format("mina2:tcp://localhost:%1$s?sync=true&textline=true&disconnect=true&minaLogger=true",
 getPort());
+        Mina2Producer producer = (Mina2Producer) 
context.getEndpoint(endpointUri).createProducer();
+        // Access session to check that the session is really closed
+        Field field = producer.getClass().getDeclaredField("session");
+        field.setAccessible(true);
+
+        for (int i = 0; i < 100; i++) {
+            Exchange e = new DefaultExchange(context, ExchangePattern.InOut);
+            e.getIn().setBody("Chad");
+            producer.process(e);
+            final IoSession ioSession = (IoSession) field.get(producer);
+            assertTrue(ioSession.getCloseFuture().isDone());
+            Object out = e.getOut().getBody();
+            assertEquals("Bye Chad", out);
+        }
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+
+            public void configure() throws Exception {
+                
from(String.format("mina2:tcp://localhost:%1$s?sync=true&textline=true", 
getPort())).process(new Processor() {
+
+                    public void process(Exchange exchange) throws Exception {
+                        String body = exchange.getIn().getBody(String.class);
+                        exchange.getOut().setBody("Bye " + body);
+                    }
+                });
+            }
+        };
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/fe41b1bb/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2EncodingTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2EncodingTest.java
 
b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2EncodingTest.java
index 63328d5..17a120f 100644
--- 
a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2EncodingTest.java
+++ 
b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2EncodingTest.java
@@ -171,7 +171,7 @@ public class Mina2EncodingTest extends BaseMina2Test {
 
         Endpoint endpoint = context.getEndpoint(uri);
         Producer producer = endpoint.createProducer();
-        Exchange exchange = producer.createExchange();
+        Exchange exchange = endpoint.createExchange();
         exchange.getIn().setBody(hello);
 
         producer.start();

http://git-wip-us.apache.org/repos/asf/camel/blob/fe41b1bb/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ExchangeTimeOutTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ExchangeTimeOutTest.java
 
b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ExchangeTimeOutTest.java
index 512c1f6..d24743f 100644
--- 
a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ExchangeTimeOutTest.java
+++ 
b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ExchangeTimeOutTest.java
@@ -35,7 +35,7 @@ public class Mina2ExchangeTimeOutTest extends BaseMina2Test {
         Endpoint endpoint = 
context.getEndpoint(String.format("mina2:tcp://localhost:%1$s?textline=true&sync=true&timeout=500",
 getPort()));
         Producer producer = endpoint.createProducer();
         producer.start();
-        Exchange exchange = producer.createExchange();
+        Exchange exchange = endpoint.createExchange();
         exchange.getIn().setBody("Hello World");
         try {
             producer.process(exchange);

http://git-wip-us.apache.org/repos/asf/camel/blob/fe41b1bb/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2NoResponseFromServerTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2NoResponseFromServerTest.java
 
b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2NoResponseFromServerTest.java
index 25b82dd..7870efc 100644
--- 
a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2NoResponseFromServerTest.java
+++ 
b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2NoResponseFromServerTest.java
@@ -73,7 +73,7 @@ public class Mina2NoResponseFromServerTest extends 
BaseMina2Test {
                 public void encode(IoSession ioSession, Object message, 
ProtocolEncoderOutput out)
                     throws Exception {
                     // close session instead of returning a reply
-                    ioSession.close(true);
+                    ioSession.closeNow();
                 }
 
                 public void dispose(IoSession ioSession) throws Exception {
@@ -89,7 +89,7 @@ public class Mina2NoResponseFromServerTest extends 
BaseMina2Test {
                 public void decode(IoSession ioSession, IoBuffer in,
                                    ProtocolDecoderOutput out) throws Exception 
{
                     // close session instead of returning a reply
-                    ioSession.close(true);
+                    ioSession.closeNow();
                 }
 
                 public void finishDecode(IoSession ioSession, 
ProtocolDecoderOutput protocolDecoderOutput)

http://git-wip-us.apache.org/repos/asf/camel/blob/fe41b1bb/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ProducerShutdownMockTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ProducerShutdownMockTest.java
 
b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ProducerShutdownMockTest.java
index c36bade..ab110a0 100644
--- 
a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ProducerShutdownMockTest.java
+++ 
b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ProducerShutdownMockTest.java
@@ -27,9 +27,10 @@ import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.mina.transport.socket.SocketConnector;
 import org.junit.Test;
 
-import static org.easymock.classextension.EasyMock.createMock;
-import static org.easymock.classextension.EasyMock.replay;
-import static org.easymock.classextension.EasyMock.verify;
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+
 
 /**
  * Unit testing for using a MinaProducer that it can shutdown properly 
(CAMEL-395)

http://git-wip-us.apache.org/repos/asf/camel/blob/fe41b1bb/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ReverseProtocolHandler.java
----------------------------------------------------------------------
diff --git 
a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ReverseProtocolHandler.java
 
b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ReverseProtocolHandler.java
index 04579fa..6267dc0 100644
--- 
a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ReverseProtocolHandler.java
+++ 
b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ReverseProtocolHandler.java
@@ -30,7 +30,7 @@ public class Mina2ReverseProtocolHandler extends 
IoHandlerAdapter {
     public void exceptionCaught(IoSession session, Throwable cause) {
         cause.printStackTrace();
         // Close connection when unexpected exception is caught.
-        session.close(true);
+        session.closeNow();
     }
 
     public void messageReceived(IoSession session, Object message) {

http://git-wip-us.apache.org/repos/asf/camel/blob/fe41b1bb/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2TransferExchangeOptionTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2TransferExchangeOptionTest.java
 
b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2TransferExchangeOptionTest.java
index 282d471..74b5683 100644
--- 
a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2TransferExchangeOptionTest.java
+++ 
b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2TransferExchangeOptionTest.java
@@ -53,7 +53,7 @@ public class Mina2TransferExchangeOptionTest extends 
BaseMina2Test {
     private Exchange sendExchange(boolean setException) throws Exception {
         Endpoint endpoint = 
context.getEndpoint(String.format("mina2:tcp://localhost:%1$s?sync=true&encoding=UTF-8&transferExchange=true",
 getPort()));
         Producer producer = endpoint.createProducer();
-        Exchange exchange = producer.createExchange();
+        Exchange exchange = endpoint.createExchange();
         //Exchange exchange = endpoint.createExchange();
 
         Message message = exchange.getIn();

Reply via email to