This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new c966a1d CAMEL-16366: camel-sjms - SJMS consumer supports exchange pooling c966a1d is described below commit c966a1d3460e483e62fcfacb3fe3b38a2e6b3a9f Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Mon Apr 12 20:53:26 2021 +0200 CAMEL-16366: camel-sjms - SJMS consumer supports exchange pooling --- .../org/apache/camel/component/sjms/SjmsMessage.java | 10 ++++++++++ .../sjms/consumer/EndpointMessageListener.java | 19 +++++++++++++++++-- 2 files changed, 27 insertions(+), 2 deletions(-) diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsMessage.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsMessage.java index 2f76d12..e683021 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsMessage.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsMessage.java @@ -52,9 +52,19 @@ public class SjmsMessage extends DefaultMessage { setBinding(binding); } + public void init(Exchange exchange, Message jmsMessage, Session jmsSession, JmsBinding binding) { + setExchange(exchange); + setJmsMessage(jmsMessage); + setJmsSession(jmsSession); + setBinding(binding); + // need to populate initial headers when we use pooled exchanges + populateInitialHeaders(getHeaders()); + } + @Override public void reset() { super.reset(); + setExchange(null); jmsMessage = null; jmsSession = null; binding = null; diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/EndpointMessageListener.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/EndpointMessageListener.java index 99ae442..db0ebac 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/EndpointMessageListener.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/EndpointMessageListener.java @@ -37,6 +37,7 @@ import org.apache.camel.component.sjms.SessionMessageListener; import org.apache.camel.component.sjms.SjmsConstants; import org.apache.camel.component.sjms.SjmsConsumer; import org.apache.camel.component.sjms.SjmsEndpoint; +import org.apache.camel.component.sjms.SjmsMessage; import org.apache.camel.component.sjms.SjmsTemplate; import org.apache.camel.component.sjms.jms.JmsMessageHelper; import org.apache.camel.support.AsyncProcessorConverterHelper; @@ -210,6 +211,8 @@ public class EndpointMessageListener implements SessionMessageListener { // if we failed processed the exchange from the async callback task, then grab the exception rce = exchange.getException(RuntimeCamelException.class); + // release back when synchronous mode + consumer.releaseExchange(exchange, false); } catch (Exception e) { rce = wrapRuntimeCamelException(e); } @@ -237,8 +240,15 @@ public class EndpointMessageListener implements SessionMessageListener { } public Exchange createExchange(Message message, Session session, Object replyDestination) { - // must be prototype scoped (not pooled) so we create the exchange via endpoint - Exchange exchange = endpoint.createExchange(message, session); + Exchange exchange = consumer.createExchange(false); + // reuse existing jms message if pooled + org.apache.camel.Message msg = exchange.getIn(); + if (msg instanceof SjmsMessage) { + SjmsMessage jm = (SjmsMessage) msg; + jm.init(exchange, message, session, endpoint.getBinding()); + } else { + exchange.setIn(new SjmsMessage(exchange, message, session, endpoint.getBinding())); + } // lets set to an InOut if we have some kind of reply-to destination if (replyDestination != null && !disableReplyTo) { @@ -459,6 +469,11 @@ public class EndpointMessageListener implements SessionMessageListener { } } } + + if (!doneSync) { + // release back when in asynchronous mode + consumer.releaseExchange(exchange, false); + } } }