Author: raulk Date: Wed Jan 9 23:52:56 2013 New Revision: 1431155 URL: http://svn.apache.org/viewvc?rev=1431155&view=rev Log: CAMEL-5865 Enhanced concurrent consumers support for JMS producers using Temp Reply Queue for replies
Added: camel/branches/camel-2.10.x/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyTempQueueMultipleConsumersTest.java - copied unchanged from r1431152, camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyTempQueueMultipleConsumersTest.java Modified: camel/branches/camel-2.10.x/ (props changed) camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java Propchange: camel/branches/camel-2.10.x/ ------------------------------------------------------------------------------ Merged /camel/trunk:r1431152 Modified: camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java?rev=1431155&r1=1431154&r2=1431155&view=diff ============================================================================== --- camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java (original) +++ camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java Wed Jan 9 23:52:56 2013 @@ -16,7 +16,10 @@ */ package org.apache.camel.component.jms.reply; +import java.util.concurrent.atomic.AtomicBoolean; + import javax.jms.Destination; +import javax.jms.ExceptionListener; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; @@ -37,11 +40,23 @@ import org.springframework.jms.support.d * @version */ public class TemporaryQueueReplyManager extends ReplyManagerSupport { - + + final TemporaryReplyQueueDestinationResolver destResolver = new TemporaryReplyQueueDestinationResolver(); + public TemporaryQueueReplyManager(CamelContext camelContext) { super(camelContext); } + @Override + public Destination getReplyTo() { + try { + destResolver.destinationReady(); + } catch (InterruptedException e) { + log.warn("Interrupted while waiting for JMSReplyTo destination refresh", e); + } + return super.getReplyTo(); + } + public String registerReply(ReplyManager replyManager, Exchange exchange, AsyncCallback callback, String originalCorrelationId, String correlationId, long requestTimeout) { // add to correlation map @@ -90,15 +105,7 @@ public class TemporaryQueueReplyManager DefaultMessageListenerContainer answer = new DefaultJmsMessageListenerContainer(endpoint); answer.setDestinationName("temporary"); - answer.setDestinationResolver(new DestinationResolver() { - public Destination resolveDestinationName(Session session, String destinationName, - boolean pubSubDomain) throws JMSException { - // use a temporary queue to gather the reply message - TemporaryQueue queue = session.createTemporaryQueue(); - setReplyTo(queue); - return queue; - } - }); + answer.setDestinationResolver(destResolver); answer.setAutoStartup(true); if (endpoint.getMaxMessagesPerTask() >= 0) { answer.setMaxMessagesPerTask(endpoint.getMaxMessagesPerTask()); @@ -113,6 +120,9 @@ public class TemporaryQueueReplyManager answer.setMaxConcurrentConsumers(endpoint.getMaxConcurrentConsumers()); } answer.setConnectionFactory(endpoint.getConnectionFactory()); + // we use CACHE_CONSUMER to cling to the consumer as long as we can, since we can only consume + // msgs from the JMS Connection that created the temp destination in the first place + answer.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONSUMER); String clientId = endpoint.getClientId(); if (clientId != null) { clientId += ".CamelReplyManager"; @@ -121,11 +131,10 @@ public class TemporaryQueueReplyManager // we cannot do request-reply over JMS with transaction answer.setSessionTransacted(false); - + // other optional properties - if (endpoint.getExceptionListener() != null) { - answer.setExceptionListener(endpoint.getExceptionListener()); - } + answer.setExceptionListener(new TemporaryReplyQueueExceptionListener(destResolver, endpoint.getExceptionListener())); + if (endpoint.getErrorHandler() != null) { answer.setErrorHandler(endpoint.getErrorHandler()); } else { @@ -144,8 +153,9 @@ public class TemporaryQueueReplyManager answer.setTaskExecutor(endpoint.getTaskExecutor()); } - // setup a bean name which is used ny Spring JMS as the thread name - String name = "TemporaryQueueReplyManager[" + answer.getDestinationName() + "]"; + // setup a bean name which is used by Spring JMS as the thread name + // use the name of the request destination + String name = "TemporaryQueueReplyManager[" + endpoint.getDestinationName() + "]"; answer.setBeanName(name); if (answer.getConcurrentConsumers() > 1) { @@ -156,4 +166,60 @@ public class TemporaryQueueReplyManager return answer; } + private final class TemporaryReplyQueueExceptionListener implements ExceptionListener { + private final TemporaryReplyQueueDestinationResolver destResolver; + private final ExceptionListener delegate; + + private TemporaryReplyQueueExceptionListener(TemporaryReplyQueueDestinationResolver destResolver, + ExceptionListener delegate) { + this.destResolver = destResolver; + this.delegate = delegate; + } + + @Override + public void onException(JMSException exception) { + // capture exceptions, and schedule a refresh of the ReplyTo destination + log.warn("Exception inside the DMLC for Temporary ReplyTo Queue for destination " + endpoint.getDestinationName() + + ", refreshing ReplyTo destination", exception); + destResolver.scheduleRefresh(); + // serve as a proxy for any exception listener the user may have set explicitly + if (delegate != null) { + delegate.onException(exception); + } + } + + } + + private final class TemporaryReplyQueueDestinationResolver implements DestinationResolver { + private TemporaryQueue queue; + private AtomicBoolean refreshWanted = new AtomicBoolean(false); + + public Destination resolveDestinationName(Session session, String destinationName, boolean pubSubDomain) + throws JMSException { + // use a temporary queue to gather the reply message + synchronized (refreshWanted) { + if (queue == null || refreshWanted.compareAndSet(true, false)) { + queue = session.createTemporaryQueue(); + setReplyTo(queue); + log.debug("Refreshed Temporary ReplyTo Queue. New queue: " + queue.getQueueName()); + refreshWanted.notifyAll(); + } + } + return queue; + } + + public void scheduleRefresh() { + refreshWanted.set(true); + } + + public void destinationReady() throws InterruptedException { + if (refreshWanted.get()) { + synchronized (refreshWanted) { + log.debug("Waiting for new Temp ReplyTo destination to be assigned to continue"); + refreshWanted.wait(); + } + } + } + } + }