Author: davsclaus Date: Wed Jul 21 07:54:37 2010 New Revision: 966129 URL: http://svn.apache.org/viewvc?rev=966129&view=rev Log: CAMEL-2970: Preperaing JmsProducer to support async routing engine for InOut MEPs.
Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/Requestor.java Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java?rev=966129&r1=966128&r2=966129&view=diff ============================================================================== --- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java (original) +++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java Wed Jul 21 07:54:37 2010 @@ -54,9 +54,6 @@ import org.springframework.transaction.P */ @ManagedResource(description = "Managed JMS Endpoint") public class JmsEndpoint extends DefaultEndpoint implements HeaderFilterStrategyAware, ManagementAware<JmsEndpoint>, MultipleConsumersSupport { - private static final int DEFAULT_THREADPOOL_SIZE = 100; - - private ScheduledExecutorService scheduledExecutorService; private HeaderFilterStrategy headerFilterStrategy; private boolean pubSubDomain; private JmsBinding binding; @@ -65,6 +62,7 @@ public class JmsEndpoint extends Default private String selector; private JmsConfiguration configuration; private Requestor requestor; + private ScheduledExecutorService requestorExecutorService; public JmsEndpoint() { this(null, null); @@ -288,7 +286,7 @@ public class JmsEndpoint extends Default public synchronized Requestor getRequestor() throws Exception { if (requestor == null) { - requestor = new Requestor(getConfiguration(), getScheduledExecutorService()); + requestor = new Requestor(getConfiguration(), getRequestorExecutorService()); requestor.start(); } return requestor; @@ -298,18 +296,6 @@ public class JmsEndpoint extends Default this.requestor = requestor; } - public synchronized ScheduledExecutorService getScheduledExecutorService() { - if (scheduledExecutorService == null) { - scheduledExecutorService = getCamelContext().getExecutorServiceStrategy() - .newScheduledThreadPool(this, getEndpointUri(), DEFAULT_THREADPOOL_SIZE); - } - return scheduledExecutorService; - } - - public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) { - this.scheduledExecutorService = scheduledExecutorService; - } - public boolean isPubSubDomain() { return pubSubDomain; } @@ -357,6 +343,13 @@ public class JmsEndpoint extends Default return template; } + protected synchronized ScheduledExecutorService getRequestorExecutorService() { + if (requestorExecutorService == null) { + requestorExecutorService = getCamelContext().getExecutorServiceStrategy().newScheduledThreadPool(this, "JmsRequesterTimeoutTask", 1); + } + return requestorExecutorService; + } + // Delegated properties from the configuration //------------------------------------------------------------------------- @ManagedAttribute Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java?rev=966129&r1=966128&r2=966129&view=diff ============================================================================== --- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java (original) +++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java Wed Jul 21 07:54:37 2010 @@ -26,6 +26,7 @@ import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; +import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.ExchangeTimedOutException; import org.apache.camel.FailedToCreateProducerException; @@ -37,7 +38,7 @@ import org.apache.camel.component.jms.re import org.apache.camel.component.jms.requestor.DeferredRequestReplyMap.DeferredMessageSentCallback; import org.apache.camel.component.jms.requestor.PersistentReplyToRequestor; import org.apache.camel.component.jms.requestor.Requestor; -import org.apache.camel.impl.DefaultProducer; +import org.apache.camel.impl.DefaultAsyncProducer; import org.apache.camel.util.UuidGenerator; import org.apache.camel.util.ValueHolder; import org.apache.commons.logging.Log; @@ -48,7 +49,7 @@ import org.springframework.jms.core.Mess /** * @version $Revision$ */ -public class JmsProducer extends DefaultProducer { +public class JmsProducer extends DefaultAsyncProducer { private static final transient Log LOG = LogFactory.getLog(JmsProducer.class); private RequestorAffinity affinity; private final JmsEndpoint endpoint; @@ -100,11 +101,11 @@ public class JmsProducer extends Default try { JmsConfiguration c = endpoint.getConfiguration(); if (c.getReplyTo() != null) { - requestor = new PersistentReplyToRequestor(endpoint.getConfiguration(), endpoint.getScheduledExecutorService()); + requestor = new PersistentReplyToRequestor(endpoint.getConfiguration(), endpoint.getRequestorExecutorService()); requestor.start(); } else { if (affinity == RequestorAffinity.PER_PRODUCER) { - requestor = new Requestor(endpoint.getConfiguration(), endpoint.getScheduledExecutorService()); + requestor = new Requestor(endpoint.getConfiguration(), endpoint.getRequestorExecutorService()); requestor.start(); } else if (affinity == RequestorAffinity.PER_ENDPOINT) { requestor = endpoint.getRequestor(); @@ -141,17 +142,17 @@ public class JmsProducer extends Default super.doStop(); } - public void process(final Exchange exchange) { + public boolean process(Exchange exchange, AsyncCallback callback) { if (!endpoint.isDisableReplyTo() && exchange.getPattern().isOutCapable()) { // in out requires a bit more work than in only - processInOut(exchange); + return processInOut(exchange, callback); } else { // in only - processInOnly(exchange); + return processInOnly(exchange, callback); } } - protected void processInOut(final Exchange exchange) { + protected boolean processInOut(final Exchange exchange, final AsyncCallback callback) { final org.apache.camel.Message in = exchange.getIn(); String destinationName = in.getHeader(JmsConstants.JMS_DESTINATION_NAME, String.class); @@ -190,7 +191,7 @@ public class JmsProducer extends Default } final ValueHolder<FutureTask> futureHolder = new ValueHolder<FutureTask>(); - final DeferredMessageSentCallback callback = msgIdAsCorrId ? deferredRequestReplyMap.createDeferredMessageSentCallback() : null; + final DeferredMessageSentCallback jmsCallback = msgIdAsCorrId ? deferredRequestReplyMap.createDeferredMessageSentCallback() : null; MessageCreator messageCreator = new MessageCreator() { public Message createMessage(Session session) throws JMSException { @@ -201,18 +202,24 @@ public class JmsProducer extends Default FutureTask future; future = (!msgIdAsCorrId) ? requestor.getReceiveFuture(message.getJMSCorrelationID(), endpoint.getConfiguration().getRequestTimeout()) - : requestor.getReceiveFuture(callback); + : requestor.getReceiveFuture(jmsCallback); futureHolder.set(future); return message; } }; - doSend(true, destinationName, destination, messageCreator, callback); + doSend(true, destinationName, destination, messageCreator, jmsCallback); // after sending then set the OUT message id to the JMSMessageID so its identical setMessageId(exchange); + // now we should routing asynchronously to not block while waiting for the reply + // TODO: + // we need a thread pool to use for continue routing messages, just like a seda consumer + // and we need options to configure it as well so you can indicate how many threads to use + // TODO: Also consider requestTimeout + // lets wait and return the response long requestTimeout = endpoint.getConfiguration().getRequestTimeout(); try { @@ -268,9 +275,12 @@ public class JmsProducer extends Default exchange.setException(e); } + // TODO: should be async + callback.done(true); + return true; } - protected void processInOnly(final Exchange exchange) { + protected boolean processInOnly(final Exchange exchange, final AsyncCallback callback) { final org.apache.camel.Message in = exchange.getIn(); String destinationName = in.getHeader(JmsConstants.JMS_DESTINATION_NAME, String.class); @@ -321,6 +331,10 @@ public class JmsProducer extends Default // after sending then set the OUT message id to the JMSMessageID so its identical setMessageId(exchange); + + // we are synchronous so return true + callback.done(true); + return true; } /** Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/Requestor.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/Requestor.java?rev=966129&r1=966128&r2=966129&view=diff ============================================================================== --- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/Requestor.java (original) +++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/Requestor.java Wed Jul 21 07:54:37 2010 @@ -20,7 +20,6 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.FutureTask; import java.util.concurrent.ScheduledExecutorService; - import javax.jms.Destination; import javax.jms.ExceptionListener; import javax.jms.JMSException; @@ -35,7 +34,6 @@ import org.apache.camel.component.jms.re import org.apache.camel.impl.ServiceSupport; import org.apache.camel.util.DefaultTimeoutMap; import org.apache.camel.util.TimeoutMap; -import org.apache.camel.util.UuidGenerator; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.core.task.TaskExecutor; @@ -49,9 +47,7 @@ import org.springframework.jms.support.d */ public class Requestor extends ServiceSupport implements MessageListener { private static final transient Log LOG = LogFactory.getLog(Requestor.class); - private static UuidGenerator uuidGenerator; private final JmsConfiguration configuration; - private ScheduledExecutorService executorService; private AbstractMessageListenerContainer listenerContainer; private TimeoutMap<String, Object> requestMap; private Map<JmsProducer, DeferredRequestReplyMap> producerDeferredRequestReplyMap; @@ -61,14 +57,15 @@ public class Requestor extends ServiceSu private long maxRequestTimeout = -1; private long replyToResolverTimeout = 5000; + // TODO: Use a Task queue to transfer replies arriving in onMessage + // instead of using the FutureHandle to support async routing public Requestor(JmsConfiguration configuration, ScheduledExecutorService executorService) { this.configuration = configuration; - this.executorService = executorService; - requestMap = new DefaultTimeoutMap<String, Object>(executorService, configuration.getRequestMapPurgePollTimeMillis()); - producerDeferredRequestReplyMap = new HashMap<JmsProducer, DeferredRequestReplyMap>(); - deferredRequestMap = new DefaultTimeoutMap<String, Object>(executorService, configuration.getRequestMapPurgePollTimeMillis()); - deferredReplyMap = new DefaultTimeoutMap<String, Object>(executorService, configuration.getRequestMapPurgePollTimeMillis()); + this.requestMap = new DefaultTimeoutMap<String, Object>(executorService, configuration.getRequestMapPurgePollTimeMillis()); + this.producerDeferredRequestReplyMap = new HashMap<JmsProducer, DeferredRequestReplyMap>(); + this.deferredRequestMap = new DefaultTimeoutMap<String, Object>(executorService, configuration.getRequestMapPurgePollTimeMillis()); + this.deferredReplyMap = new DefaultTimeoutMap<String, Object>(executorService, configuration.getRequestMapPurgePollTimeMillis()); } public synchronized DeferredRequestReplyMap getDeferredRequestReplyMap(JmsProducer producer) { @@ -261,13 +258,6 @@ public class Requestor extends ServiceSu return answer; } - public static synchronized UuidGenerator getUuidGenerator() { - if (uuidGenerator == null) { - uuidGenerator = UuidGenerator.get(); - } - return uuidGenerator; - } - protected JmsConfiguration getConfiguration() { return configuration; }