Author: davsclaus Date: Fri Apr 27 15:02:45 2012 New Revision: 1331455 URL: http://svn.apache.org/viewvc?rev=1331455&view=rev Log: CAMEL-4309: Added option asyncStopListener to camel-jms to stop using async thread
Added: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAsyncStartStopListenerTest.java - copied, changed from r1331369, camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAsyncStartListenerTest.java camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAsyncStopListenerTest.java - copied, changed from r1331369, camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAsyncStartListenerTest.java Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java?rev=1331455&r1=1331454&r2=1331455&view=diff ============================================================================== --- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java (original) +++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java Fri Apr 27 15:02:45 2012 @@ -54,7 +54,7 @@ public class JmsComponent extends Defaul private ApplicationContext applicationContext; private QueueBrowseStrategy queueBrowseStrategy; private HeaderFilterStrategy headerFilterStrategy = new JmsHeaderFilterStrategy(); - private ExecutorService asyncStartExecutorService; + private ExecutorService asyncStartStopExecutorService; public JmsComponent() { } @@ -324,6 +324,10 @@ public class JmsComponent extends Defaul getConfiguration().setAsyncStartListener(asyncStartListener); } + public void setAsyncStopListener(boolean asyncStopListener) { + getConfiguration().setAsyncStopListener(asyncStopListener); + } + public void setForceSendOriginalMessage(boolean forceSendOriginalMessage) { getConfiguration().setForceSendOriginalMessage(forceSendOriginalMessage); } @@ -404,20 +408,20 @@ public class JmsComponent extends Defaul @Override protected void doShutdown() throws Exception { - if (asyncStartExecutorService != null) { - getCamelContext().getExecutorServiceManager().shutdownNow(asyncStartExecutorService); - asyncStartExecutorService = null; + if (asyncStartStopExecutorService != null) { + getCamelContext().getExecutorServiceManager().shutdownNow(asyncStartStopExecutorService); + asyncStartStopExecutorService = null; } super.doShutdown(); } - protected synchronized ExecutorService getAsyncStartExecutorService() { - if (asyncStartExecutorService == null) { + protected synchronized ExecutorService getAsyncStartStopExecutorService() { + if (asyncStartStopExecutorService == null) { // use a cached thread pool for async start tasks as they can run for a while, and we need a dedicated thread // for each task, and the thread pool will shrink when no more tasks running - asyncStartExecutorService = getCamelContext().getExecutorServiceManager().newCachedThreadPool(this, "AsyncStartListener"); + asyncStartStopExecutorService = getCamelContext().getExecutorServiceManager().newCachedThreadPool(this, "AsyncStartStopListener"); } - return asyncStartExecutorService; + return asyncStartStopExecutorService; } @Override Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java?rev=1331455&r1=1331454&r2=1331455&view=diff ============================================================================== --- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java (original) +++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java Fri Apr 27 15:02:45 2012 @@ -124,7 +124,8 @@ public class JmsConfiguration implements private boolean transferException; private boolean testConnectionOnStartup; private boolean asyncStartListener; - // if the message is a JmsMessage and mapJmsMessage=false, force the + private boolean asyncStopListener; + // if the message is a JmsMessage and mapJmsMessage=false, force the // producer to send the javax.jms.Message body to the next JMS destination private boolean forceSendOriginalMessage; // to force disabling time to live (works in both in-only or in-out mode) @@ -1200,6 +1201,14 @@ public class JmsConfiguration implements this.asyncStartListener = asyncStartListener; } + public boolean isAsyncStopListener() { + return asyncStopListener; + } + + public void setAsyncStopListener(boolean asyncStopListener) { + this.asyncStopListener = asyncStopListener; + } + public boolean isTestConnectionOnStartup() { return testConnectionOnStartup; } Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java?rev=1331455&r1=1331454&r2=1331455&view=diff ============================================================================== --- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java (original) +++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java Fri Apr 27 15:02:45 2012 @@ -115,7 +115,7 @@ public class JmsConsumer extends Default } if (getEndpoint().getConfiguration().isAsyncStartListener()) { - getEndpoint().getAsyncStartExecutorService().submit(new Runnable() { + getEndpoint().getAsyncStartStopExecutorService().submit(new Runnable() { @Override public void run() { try { @@ -151,19 +151,42 @@ public class JmsConsumer extends Default } } - @Override - protected void doStop() throws Exception { + protected void stopAndDestroyListenerContainer() { if (listenerContainer != null) { listenerContainer.stop(); listenerContainer.destroy(); - // TODO: The async destroy code does not work well see https://issues.apache.org/jira/browse/CAMEL-4309 - // getEndpoint().destroyMessageListenerContainer(listenerContainer); } - // null container and listener so they are fully re created if this consumer is restarted // then we will use updated configuration from jms endpoint that may have been managed using JMX listenerContainer = null; messageListener = null; + } + + @Override + protected void doStop() throws Exception { + if (listenerContainer != null) { + + if (getEndpoint().getConfiguration().isAsyncStopListener()) { + getEndpoint().getAsyncStartStopExecutorService().submit(new Runnable() { + @Override + public void run() { + try { + stopAndDestroyListenerContainer(); + } catch (Throwable e) { + log.warn("Error stopping listener container on destination: " + getDestinationName() + ". This exception will be ignored.", e); + } + } + + @Override + public String toString() { + return "AsyncStopListenerTask[" + getDestinationName() + "]"; + } + }); + } else { + prepareAndStartListenerContainer(); + } + } + super.doStop(); } Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java?rev=1331455&r1=1331454&r2=1331455&view=diff ============================================================================== --- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java (original) +++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java Fri Apr 27 15:02:45 2012 @@ -469,12 +469,12 @@ public class JmsEndpoint extends Default return replyManagerExecutorService; } - protected ExecutorService getAsyncStartExecutorService() { + protected ExecutorService getAsyncStartStopExecutorService() { if (getComponent() == null) { - throw new IllegalStateException("AsyncStartListener requires JmsComponent to be configured on this endpoint: " + this); + throw new IllegalStateException("AsyncStartStopListener requires JmsComponent to be configured on this endpoint: " + this); } // use shared thread pool from component - return getComponent().getAsyncStartExecutorService(); + return getComponent().getAsyncStartStopExecutorService(); } /** @@ -1096,6 +1096,16 @@ public class JmsEndpoint extends Default } @ManagedAttribute + public void setAsyncStopListener(boolean asyncStoptListener) { + configuration.setAsyncStopListener(asyncStoptListener); + } + + @ManagedAttribute + public boolean isAsyncStopListener() { + return configuration.isAsyncStopListener(); + } + + @ManagedAttribute public String getReplyToType() { if (configuration.getReplyToType() != null) { return configuration.getReplyToType().name(); Copied: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAsyncStartStopListenerTest.java (from r1331369, camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAsyncStartListenerTest.java) URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAsyncStartStopListenerTest.java?p2=camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAsyncStartStopListenerTest.java&p1=camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAsyncStartListenerTest.java&r1=1331369&r2=1331455&rev=1331455&view=diff ============================================================================== --- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAsyncStartListenerTest.java (original) +++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAsyncStartStopListenerTest.java Fri Apr 27 15:02:45 2012 @@ -31,7 +31,7 @@ import static org.apache.camel.component /** * Testing with async start listener */ -public class JmsAsyncStartListenerTest extends CamelTestSupport { +public class JmsAsyncStartStopListenerTest extends CamelTestSupport { protected String componentName = "activemq"; @@ -54,6 +54,7 @@ public class JmsAsyncStartListenerTest e ConnectionFactory connectionFactory = CamelJmsTestHelper.createPersistentConnectionFactory(); JmsComponent jms = jmsComponentAutoAcknowledge(connectionFactory); jms.setAsyncStartListener(true); + jms.setAsyncStopListener(true); camelContext.addComponent(componentName, jms); return camelContext; Copied: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAsyncStopListenerTest.java (from r1331369, camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAsyncStartListenerTest.java) URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAsyncStopListenerTest.java?p2=camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAsyncStopListenerTest.java&p1=camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAsyncStartListenerTest.java&r1=1331369&r2=1331455&rev=1331455&view=diff ============================================================================== --- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAsyncStartListenerTest.java (original) +++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAsyncStopListenerTest.java Fri Apr 27 15:02:45 2012 @@ -29,14 +29,14 @@ import org.junit.Test; import static org.apache.camel.component.jms.JmsComponent.jmsComponentAutoAcknowledge; /** - * Testing with async start listener + * Testing with async stop listener */ -public class JmsAsyncStartListenerTest extends CamelTestSupport { +public class JmsAsyncStopListenerTest extends CamelTestSupport { protected String componentName = "activemq"; @Test - public void testAsyncStartListener() throws Exception { + public void testAsyncStopListener() throws Exception { MockEndpoint result = getMockEndpoint("mock:result"); result.expectedMessageCount(2); @@ -53,7 +53,7 @@ public class JmsAsyncStartListenerTest e // so we need a persistent store in case no active consumers when we send the messages ConnectionFactory connectionFactory = CamelJmsTestHelper.createPersistentConnectionFactory(); JmsComponent jms = jmsComponentAutoAcknowledge(connectionFactory); - jms.setAsyncStartListener(true); + jms.setAsyncStopListener(true); camelContext.addComponent(componentName, jms); return camelContext;