Author: raulk Date: Wed Jan 9 23:49:26 2013 New Revision: 1431152 URL: http://svn.apache.org/viewvc?rev=1431152&view=rev Log: CAMEL-5865 Enhanced concurrent consumers support for JMS producers using Temp Reply Queue for replies
Added: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyTempQueueMultipleConsumersTest.java Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java?rev=1431152&r1=1431151&r2=1431152&view=diff ============================================================================== --- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java (original) +++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java Wed Jan 9 23:49:26 2013 @@ -16,7 +16,10 @@ */ package org.apache.camel.component.jms.reply; +import java.util.concurrent.atomic.AtomicBoolean; + import javax.jms.Destination; +import javax.jms.ExceptionListener; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; @@ -37,11 +40,23 @@ import org.springframework.jms.support.d * @version */ public class TemporaryQueueReplyManager extends ReplyManagerSupport { - + + final TemporaryReplyQueueDestinationResolver destResolver = new TemporaryReplyQueueDestinationResolver(); + public TemporaryQueueReplyManager(CamelContext camelContext) { super(camelContext); } + @Override + public Destination getReplyTo() { + try { + destResolver.destinationReady(); + } catch (InterruptedException e) { + log.warn("Interrupted while waiting for JMSReplyTo destination refresh", e); + } + return super.getReplyTo(); + } + public String registerReply(ReplyManager replyManager, Exchange exchange, AsyncCallback callback, String originalCorrelationId, String correlationId, long requestTimeout) { // add to correlation map @@ -90,15 +105,7 @@ public class TemporaryQueueReplyManager DefaultMessageListenerContainer answer = new DefaultJmsMessageListenerContainer(endpoint); answer.setDestinationName("temporary"); - answer.setDestinationResolver(new DestinationResolver() { - public Destination resolveDestinationName(Session session, String destinationName, - boolean pubSubDomain) throws JMSException { - // use a temporary queue to gather the reply message - TemporaryQueue queue = session.createTemporaryQueue(); - setReplyTo(queue); - return queue; - } - }); + answer.setDestinationResolver(destResolver); answer.setAutoStartup(true); if (endpoint.getMaxMessagesPerTask() >= 0) { answer.setMaxMessagesPerTask(endpoint.getMaxMessagesPerTask()); @@ -113,6 +120,9 @@ public class TemporaryQueueReplyManager answer.setMaxConcurrentConsumers(endpoint.getMaxConcurrentConsumers()); } answer.setConnectionFactory(endpoint.getConnectionFactory()); + // we use CACHE_CONSUMER to cling to the consumer as long as we can, since we can only consume + // msgs from the JMS Connection that created the temp destination in the first place + answer.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONSUMER); String clientId = endpoint.getClientId(); if (clientId != null) { clientId += ".CamelReplyManager"; @@ -121,11 +131,10 @@ public class TemporaryQueueReplyManager // we cannot do request-reply over JMS with transaction answer.setSessionTransacted(false); - + // other optional properties - if (endpoint.getExceptionListener() != null) { - answer.setExceptionListener(endpoint.getExceptionListener()); - } + answer.setExceptionListener(new TemporaryReplyQueueExceptionListener(destResolver, endpoint.getExceptionListener())); + if (endpoint.getErrorHandler() != null) { answer.setErrorHandler(endpoint.getErrorHandler()); } else { @@ -144,8 +153,9 @@ public class TemporaryQueueReplyManager answer.setTaskExecutor(endpoint.getTaskExecutor()); } - // setup a bean name which is used ny Spring JMS as the thread name - String name = "TemporaryQueueReplyManager[" + answer.getDestinationName() + "]"; + // setup a bean name which is used by Spring JMS as the thread name + // use the name of the request destination + String name = "TemporaryQueueReplyManager[" + endpoint.getDestinationName() + "]"; answer.setBeanName(name); if (answer.getConcurrentConsumers() > 1) { @@ -156,4 +166,60 @@ public class TemporaryQueueReplyManager return answer; } + private final class TemporaryReplyQueueExceptionListener implements ExceptionListener { + private final TemporaryReplyQueueDestinationResolver destResolver; + private final ExceptionListener delegate; + + private TemporaryReplyQueueExceptionListener(TemporaryReplyQueueDestinationResolver destResolver, + ExceptionListener delegate) { + this.destResolver = destResolver; + this.delegate = delegate; + } + + @Override + public void onException(JMSException exception) { + // capture exceptions, and schedule a refresh of the ReplyTo destination + log.warn("Exception inside the DMLC for Temporary ReplyTo Queue for destination " + endpoint.getDestinationName() + + ", refreshing ReplyTo destination", exception); + destResolver.scheduleRefresh(); + // serve as a proxy for any exception listener the user may have set explicitly + if (delegate != null) { + delegate.onException(exception); + } + } + + } + + private final class TemporaryReplyQueueDestinationResolver implements DestinationResolver { + private TemporaryQueue queue; + private AtomicBoolean refreshWanted = new AtomicBoolean(false); + + public Destination resolveDestinationName(Session session, String destinationName, boolean pubSubDomain) + throws JMSException { + // use a temporary queue to gather the reply message + synchronized (refreshWanted) { + if (queue == null || refreshWanted.compareAndSet(true, false)) { + queue = session.createTemporaryQueue(); + setReplyTo(queue); + log.debug("Refreshed Temporary ReplyTo Queue. New queue: " + queue.getQueueName()); + refreshWanted.notifyAll(); + } + } + return queue; + } + + public void scheduleRefresh() { + refreshWanted.set(true); + } + + public void destinationReady() throws InterruptedException { + if (refreshWanted.get()) { + synchronized (refreshWanted) { + log.debug("Waiting for new Temp ReplyTo destination to be assigned to continue"); + refreshWanted.wait(); + } + } + } + } + } Added: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyTempQueueMultipleConsumersTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyTempQueueMultipleConsumersTest.java?rev=1431152&view=auto ============================================================================== --- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyTempQueueMultipleConsumersTest.java (added) +++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyTempQueueMultipleConsumersTest.java Wed Jan 9 23:49:26 2013 @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.jms; + +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.ConnectionFactory; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnection; +import org.apache.activemq.pool.PooledConnectionFactory; +import org.apache.camel.CamelContext; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +import static org.apache.camel.component.jms.JmsComponent.jmsComponentAutoAcknowledge; + +/** + * Reliability tests for JMS TempQueue Reply Manager with multiple consumers. + * @version + */ +public class JmsRequestReplyTempQueueMultipleConsumersTest extends CamelTestSupport { + + private Map<String, AtomicInteger> msgsPerThread = new ConcurrentHashMap<String, AtomicInteger>(); + private BrokerService broker; + private PooledConnectionFactory connectionFactory; + + @Test + public void testMultipleConsumingThreads() throws Exception { + doSendMessages(1000, 5); + assertTrue("Expected multiple consuming threads, but only found: " + msgsPerThread.keySet().size(), + msgsPerThread.keySet().size() > 1); + } + + @Test + public void testTempQueueRefreshed() throws Exception { + doSendMessages(500, 5); + connectionFactory.clear(); + doSendMessages(100, 5); + connectionFactory.clear(); + doSendMessages(100, 5); + connectionFactory.clear(); + doSendMessages(100, 5); + connectionFactory.clear(); + doSendMessages(100, 5); + connectionFactory.clear(); + doSendMessages(100, 5); + } + + private void doSendMessages(int files, int poolSize) throws Exception { + getMockEndpoint("mock:result").expectedMessageCount(files); + getMockEndpoint("mock:result").expectsNoDuplicates(body()); + + ExecutorService executor = Executors.newFixedThreadPool(poolSize); + for (int i = 0; i < files; i++) { + final int index = i; + executor.submit(new Callable<Object>() { + public Object call() throws Exception { + template.sendBody("seda:start", "Message " + index); + return null; + } + }); + } + + assertMockEndpointsSatisfied(); + resetMocks(); + executor.shutdownNow(); + } + + public void startBroker() throws Exception { + String brokerName = "test-broker-" + System.currentTimeMillis(); + String brokerUri = "vm://" + brokerName; + broker = new BrokerService(); + broker.setBrokerName(brokerName); + broker.setBrokerId(brokerName); + broker.addConnector(brokerUri); + broker.setPersistent(false); + broker.start(); + } + + protected CamelContext createCamelContext() throws Exception { + CamelContext camelContext = super.createCamelContext(); + //startBroker(); + + connectionFactory = (PooledConnectionFactory) CamelJmsTestHelper.createConnectionFactory(); + camelContext.addComponent("jms", jmsComponentAutoAcknowledge(connectionFactory)); + + return camelContext; + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("seda:start") + .inOut("jms:queue:foo?concurrentConsumers=10&maxConcurrentConsumers=20&recoveryInterval=10") + .process(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + String threadName = Thread.currentThread().getName(); + synchronized (msgsPerThread) { + AtomicInteger count = msgsPerThread.get(threadName); + if (count == null) { + count = new AtomicInteger(0); + msgsPerThread.put(threadName, count); + } + count.incrementAndGet(); + } + } + }) + .to("mock:result"); + + from("jms:queue:foo?concurrentConsumers=20&recoveryInterval=10") + .setBody(simple("Reply >>> ${body}")); + } + }; + } + +}