Repository: camel Updated Branches: refs/heads/master e816247d6 -> e319c9e08
CAMEL-8711: Expose JMS session to Camel Message so end users can access it and use it for client ack mode etc. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e00e0d65 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e00e0d65 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e00e0d65 Branch: refs/heads/master Commit: e00e0d6599b01733c270f3053e23118d35ea0881 Parents: e816247 Author: Claus Ibsen <davscl...@apache.org> Authored: Wed May 6 11:20:43 2015 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Wed May 6 11:20:43 2015 +0200 ---------------------------------------------------------------------- .../jms/DefaultQueueBrowseStrategy.java | 4 +-- .../component/jms/EndpointMessageListener.java | 12 +++++--- .../apache/camel/component/jms/JmsEndpoint.java | 5 +-- .../apache/camel/component/jms/JmsMessage.java | 32 ++++++++++++++------ .../camel/component/jms/JmsPollingConsumer.java | 3 +- .../component/jms/reply/QueueReplyManager.java | 4 +-- .../camel/component/jms/reply/ReplyHandler.java | 6 ++-- .../camel/component/jms/reply/ReplyHolder.java | 17 +++++++++-- .../camel/component/jms/reply/ReplyManager.java | 4 +-- .../jms/reply/ReplyManagerSupport.java | 12 +++++--- .../jms/reply/TemporaryQueueReplyHandler.java | 5 +-- .../jms/reply/TemporaryQueueReplyManager.java | 4 +-- 12 files changed, 72 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/e00e0d65/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultQueueBrowseStrategy.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultQueueBrowseStrategy.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultQueueBrowseStrategy.java index 3258d61..45f5847 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultQueueBrowseStrategy.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultQueueBrowseStrategy.java @@ -51,7 +51,7 @@ public class DefaultQueueBrowseStrategy implements QueueBrowseStrategy { Enumeration<?> iter = browser.getEnumeration(); for (int i = 0; i < size && iter.hasMoreElements(); i++) { Message message = (Message) iter.nextElement(); - Exchange exchange = endpoint.createExchange(message); + Exchange exchange = endpoint.createExchange(message, session); answer.add(exchange); } return answer; @@ -71,7 +71,7 @@ public class DefaultQueueBrowseStrategy implements QueueBrowseStrategy { Enumeration<?> iter = browser.getEnumeration(); for (int i = 0; i < size && iter.hasMoreElements(); i++) { Message message = (Message) iter.nextElement(); - Exchange exchange = endpoint.createExchange(message); + Exchange exchange = endpoint.createExchange(message, session); answer.add(exchange); } return answer; http://git-wip-us.apache.org/repos/asf/camel/blob/e00e0d65/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java index f2afc60..3980128 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java @@ -35,6 +35,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.jms.core.JmsOperations; import org.springframework.jms.core.MessageCreator; +import org.springframework.jms.listener.SessionAwareMessageListener; import static org.apache.camel.util.ObjectHelper.wrapRuntimeCamelException; @@ -46,7 +47,7 @@ import static org.apache.camel.util.ObjectHelper.wrapRuntimeCamelException; * * @version */ -public class EndpointMessageListener implements MessageListener { +public class EndpointMessageListener implements SessionAwareMessageListener { private static final Logger LOG = LoggerFactory.getLogger(EndpointMessageListener.class); private final JmsEndpoint endpoint; private final AsyncProcessor processor; @@ -62,7 +63,8 @@ public class EndpointMessageListener implements MessageListener { this.processor = AsyncProcessorConverterHelper.convert(processor); } - public void onMessage(final Message message) { + @Override + public void onMessage(Message message, Session session) throws JMSException { LOG.trace("onMessage START"); LOG.debug("{} consumer received JMS message: {}", endpoint, message); @@ -75,7 +77,7 @@ public class EndpointMessageListener implements MessageListener { // and disableReplyTo hasn't been explicit enabled sendReply = replyDestination != null && !disableReplyTo; - final Exchange exchange = createExchange(message, replyDestination); + final Exchange exchange = createExchange(message, session, replyDestination); if (eagerLoadingOfProperties) { exchange.getIn().getHeaders(); } @@ -233,11 +235,11 @@ public class EndpointMessageListener implements MessageListener { } } - public Exchange createExchange(Message message, Object replyDestination) { + public Exchange createExchange(Message message, Session session, Object replyDestination) { Exchange exchange = endpoint.createExchange(); JmsBinding binding = getBinding(); exchange.setProperty(Exchange.BINDING, binding); - exchange.setIn(new JmsMessage(message, binding)); + exchange.setIn(new JmsMessage(message, session, binding)); // lets set to an InOut if we have some kind of reply-to destination if (replyDestination != null && !disableReplyTo) { http://git-wip-us.apache.org/repos/asf/camel/blob/e00e0d65/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java index ff9bc32..6389226 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java @@ -25,6 +25,7 @@ import javax.jms.ExceptionListener; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Queue; +import javax.jms.Session; import javax.jms.TemporaryQueue; import javax.jms.TemporaryTopic; import javax.jms.Topic; @@ -292,9 +293,9 @@ public class JmsEndpoint extends DefaultEndpoint implements HeaderFilterStrategy return exchange; } - public Exchange createExchange(Message message) { + public Exchange createExchange(Message message, Session session) { Exchange exchange = createExchange(getExchangePattern()); - exchange.setIn(new JmsMessage(message, getBinding())); + exchange.setIn(new JmsMessage(message, session, getBinding())); return exchange; } http://git-wip-us.apache.org/repos/asf/camel/blob/e00e0d65/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessage.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessage.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessage.java index 45274cf..1dfbd48 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessage.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessage.java @@ -22,6 +22,7 @@ import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Queue; +import javax.jms.Session; import javax.jms.Topic; import org.apache.camel.RuntimeExchangeException; @@ -39,10 +40,12 @@ import org.slf4j.LoggerFactory; public class JmsMessage extends DefaultMessage { private static final Logger LOG = LoggerFactory.getLogger(JmsMessage.class); private Message jmsMessage; + private Session jmsSession; private JmsBinding binding; - public JmsMessage(Message jmsMessage, JmsBinding binding) { + public JmsMessage(Message jmsMessage, Session jmsSession, JmsBinding binding) { setJmsMessage(jmsMessage); + setJmsSession(jmsSession); setBinding(binding); } @@ -98,13 +101,6 @@ public class JmsMessage extends DefaultMessage { } } - /** - * Returns the underlying JMS message - */ - public Message getJmsMessage() { - return jmsMessage; - } - public JmsBinding getBinding() { if (binding == null) { binding = ExchangeHelper.getBinding(getExchange(), JmsBinding.class); @@ -116,6 +112,13 @@ public class JmsMessage extends DefaultMessage { this.binding = binding; } + /** + * Returns the underlying JMS message + */ + public Message getJmsMessage() { + return jmsMessage; + } + public void setJmsMessage(Message jmsMessage) { if (jmsMessage != null) { try { @@ -127,6 +130,17 @@ public class JmsMessage extends DefaultMessage { this.jmsMessage = jmsMessage; } + /** + * Returns the underlying JMS session + */ + public Session getJmsSession() { + return jmsSession; + } + + public void setJmsSession(Session jmsSession) { + this.jmsSession = jmsSession; + } + @Override public void setBody(Object body) { super.setBody(body); @@ -186,7 +200,7 @@ public class JmsMessage extends DefaultMessage { @Override public JmsMessage newInstance() { - return new JmsMessage(null, binding); + return new JmsMessage(null, null, binding); } /** http://git-wip-us.apache.org/repos/asf/camel/blob/e00e0d65/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsPollingConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsPollingConsumer.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsPollingConsumer.java index 2df30cf..cbb74b0 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsPollingConsumer.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsPollingConsumer.java @@ -56,6 +56,7 @@ public class JmsPollingConsumer extends PollingConsumerSupport implements Servic } public Exchange receive(long timeout) { + // TODO: use api so we can get hold of session setReceiveTimeout(timeout); Message message; // using the selector @@ -65,7 +66,7 @@ public class JmsPollingConsumer extends PollingConsumerSupport implements Servic message = template.receive(); } if (message != null) { - return getEndpoint().createExchange(message); + return getEndpoint().createExchange(message, null); } return null; } http://git-wip-us.apache.org/repos/asf/camel/blob/e00e0d65/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java index 07ddfad..1b72680 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java @@ -65,7 +65,7 @@ public class QueueReplyManager extends ReplyManagerSupport { correlation.put(newCorrelationId, handler, requestTimeout); } - protected void handleReplyMessage(String correlationID, Message message) { + protected void handleReplyMessage(String correlationID, Message message, Session session) { ReplyHandler handler = correlation.get(correlationID); if (handler == null && endpoint.isUseMessageIDAsCorrelationID()) { handler = waitForProvisionCorrelationToBeUpdated(correlationID, message); @@ -73,7 +73,7 @@ public class QueueReplyManager extends ReplyManagerSupport { if (handler != null) { correlation.remove(correlationID); - handler.onReply(correlationID, message); + handler.onReply(correlationID, message, session); } else { // we could not correlate the received reply message to a matching request and therefore // we cannot continue routing the unknown message http://git-wip-us.apache.org/repos/asf/camel/blob/e00e0d65/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyHandler.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyHandler.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyHandler.java index 5579745..e05b5c4 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyHandler.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyHandler.java @@ -17,6 +17,7 @@ package org.apache.camel.component.jms.reply; import javax.jms.Message; +import javax.jms.Session; /** * Handles a reply. @@ -29,9 +30,10 @@ public interface ReplyHandler { * The reply message was received * * @param correlationId the correlation id - * @param reply the reply message + * @param reply the JMS reply message + * @param session the JMS session */ - void onReply(String correlationId, Message reply); + void onReply(String correlationId, Message reply, Session session); /** * The reply message was not received and a timeout triggered http://git-wip-us.apache.org/repos/asf/camel/blob/e00e0d65/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyHolder.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyHolder.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyHolder.java index a967a06..84945ad 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyHolder.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyHolder.java @@ -17,6 +17,7 @@ package org.apache.camel.component.jms.reply; import javax.jms.Message; +import javax.jms.Session; import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; @@ -32,6 +33,7 @@ public class ReplyHolder { private final Exchange exchange; private final AsyncCallback callback; private final Message message; + private final Session session; private final String originalCorrelationId; private final String correlationId; private long timeout; @@ -40,12 +42,13 @@ public class ReplyHolder { * Constructor to use when a reply message was received */ public ReplyHolder(Exchange exchange, AsyncCallback callback, String originalCorrelationId, - String correlationId, Message message) { + String correlationId, Message message, Session session) { this.exchange = exchange; this.callback = callback; this.originalCorrelationId = originalCorrelationId; this.correlationId = correlationId; this.message = message; + this.session = session; } /** @@ -53,7 +56,7 @@ public class ReplyHolder { */ public ReplyHolder(Exchange exchange, AsyncCallback callback, String originalCorrelationId, String correlationId, long timeout) { - this(exchange, callback, originalCorrelationId, correlationId, null); + this(exchange, callback, originalCorrelationId, correlationId, null, null); this.timeout = timeout; } @@ -95,6 +98,16 @@ public class ReplyHolder { } /** + * Gets the JMS session from the received message + * + * @return the JMS session, or <tt>null</tt> if timeout occurred and no message has been received + * @see #isTimeout() + */ + public Session getSession() { + return session; + } + + /** * Whether timeout triggered or not. * <p/> * A timeout is triggered if <tt>requestTimeout</tt> option has been configured, and a reply message has <b>not</b> been http://git-wip-us.apache.org/repos/asf/camel/blob/e00e0d65/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManager.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManager.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManager.java index 9eb0085..e3b65aa 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManager.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManager.java @@ -20,11 +20,11 @@ import java.util.concurrent.ScheduledExecutorService; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; -import javax.jms.MessageListener; import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.component.jms.JmsEndpoint; +import org.springframework.jms.listener.SessionAwareMessageListener; /** * The {@link ReplyManager} is responsible for handling <a href="http://camel.apache.org/request-reply.html">request-reply</a> @@ -32,7 +32,7 @@ import org.apache.camel.component.jms.JmsEndpoint; * * @version */ -public interface ReplyManager extends MessageListener { +public interface ReplyManager extends SessionAwareMessageListener { /** * Sets the belonging {@link org.apache.camel.component.jms.JmsEndpoint}. http://git-wip-us.apache.org/repos/asf/camel/blob/e00e0d65/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java index 83a7729..a2f70c3 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java @@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; +import javax.jms.Session; import org.apache.camel.AsyncCallback; import org.apache.camel.CamelContext; @@ -114,7 +115,7 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl protected abstract ReplyHandler createReplyHandler(ReplyManager replyManager, Exchange exchange, AsyncCallback callback, String originalCorrelationId, String correlationId, long requestTimeout); - public void onMessage(Message message) { + public void onMessage(Message message, Session session) throws JMSException { String correlationID = null; try { correlationID = message.getJMSCorrelationID(); @@ -129,14 +130,13 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl log.debug("Received reply message with correlationID [{}] -> {}", correlationID, message); // handle the reply message - handleReplyMessage(correlationID, message); + handleReplyMessage(correlationID, message, session); } public void processReply(ReplyHolder holder) { if (holder != null && isRunAllowed()) { try { Exchange exchange = holder.getExchange(); - Message message = holder.getMessage(); boolean timeout = holder.isTimeout(); if (timeout) { @@ -151,7 +151,9 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl String msg = "reply message with correlationID: " + holder.getCorrelationId() + " not received on destination: " + replyTo; exchange.setException(new ExchangeTimedOutException(exchange, holder.getRequestTimeout(), msg)); } else { - JmsMessage response = new JmsMessage(message, endpoint.getBinding()); + Message message = holder.getMessage(); + Session session = holder.getSession(); + JmsMessage response = new JmsMessage(message, session, endpoint.getBinding()); // the JmsBinding is designed to be "pull-based": it will populate the Camel message on demand // therefore, we link Exchange and OUT message before continuing, so that the JmsBinding has full access // to everything it may need, and can populate headers, properties, etc. accordingly (solves CAMEL-6218). @@ -181,7 +183,7 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl } } - protected abstract void handleReplyMessage(String correlationID, Message message); + protected abstract void handleReplyMessage(String correlationID, Message message, Session session); protected abstract AbstractMessageListenerContainer createListenerContainer() throws Exception; http://git-wip-us.apache.org/repos/asf/camel/blob/e00e0d65/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyHandler.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyHandler.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyHandler.java index a2b28a2..f752a05 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyHandler.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyHandler.java @@ -17,6 +17,7 @@ package org.apache.camel.component.jms.reply; import javax.jms.Message; +import javax.jms.Session; import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; @@ -47,9 +48,9 @@ public class TemporaryQueueReplyHandler implements ReplyHandler { this.timeout = timeout; } - public void onReply(String correlationId, Message reply) { + public void onReply(String correlationId, Message reply, Session session) { // create holder object with the the reply - ReplyHolder holder = new ReplyHolder(exchange, callback, originalCorrelationId, correlationId, reply); + ReplyHolder holder = new ReplyHolder(exchange, callback, originalCorrelationId, correlationId, reply, session); // process the reply replyManager.processReply(holder); } http://git-wip-us.apache.org/repos/asf/camel/blob/e00e0d65/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java index 0e3d98b..6d1f51d 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java @@ -72,7 +72,7 @@ public class TemporaryQueueReplyManager extends ReplyManagerSupport { } @Override - protected void handleReplyMessage(String correlationID, Message message) { + protected void handleReplyMessage(String correlationID, Message message, Session session) { ReplyHandler handler = correlation.get(correlationID); if (handler == null && endpoint.isUseMessageIDAsCorrelationID()) { handler = waitForProvisionCorrelationToBeUpdated(correlationID, message); @@ -80,7 +80,7 @@ public class TemporaryQueueReplyManager extends ReplyManagerSupport { if (handler != null) { correlation.remove(correlationID); - handler.onReply(correlationID, message); + handler.onReply(correlationID, message, session); } else { // we could not correlate the received reply message to a matching request and therefore // we cannot continue routing the unknown message