This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new c966a1d  CAMEL-16366: camel-sjms - SJMS consumer supports exchange 
pooling
c966a1d is described below

commit c966a1d3460e483e62fcfacb3fe3b38a2e6b3a9f
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Mon Apr 12 20:53:26 2021 +0200

    CAMEL-16366: camel-sjms - SJMS consumer supports exchange pooling
---
 .../org/apache/camel/component/sjms/SjmsMessage.java  | 10 ++++++++++
 .../sjms/consumer/EndpointMessageListener.java        | 19 +++++++++++++++++--
 2 files changed, 27 insertions(+), 2 deletions(-)

diff --git 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsMessage.java
 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsMessage.java
index 2f76d12..e683021 100644
--- 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsMessage.java
+++ 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsMessage.java
@@ -52,9 +52,19 @@ public class SjmsMessage extends DefaultMessage {
         setBinding(binding);
     }
 
+    public void init(Exchange exchange, Message jmsMessage, Session 
jmsSession, JmsBinding binding) {
+        setExchange(exchange);
+        setJmsMessage(jmsMessage);
+        setJmsSession(jmsSession);
+        setBinding(binding);
+        // need to populate initial headers when we use pooled exchanges
+        populateInitialHeaders(getHeaders());
+    }
+
     @Override
     public void reset() {
         super.reset();
+        setExchange(null);
         jmsMessage = null;
         jmsSession = null;
         binding = null;
diff --git 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/EndpointMessageListener.java
 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/EndpointMessageListener.java
index 99ae442..db0ebac 100644
--- 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/EndpointMessageListener.java
+++ 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/EndpointMessageListener.java
@@ -37,6 +37,7 @@ import org.apache.camel.component.sjms.SessionMessageListener;
 import org.apache.camel.component.sjms.SjmsConstants;
 import org.apache.camel.component.sjms.SjmsConsumer;
 import org.apache.camel.component.sjms.SjmsEndpoint;
+import org.apache.camel.component.sjms.SjmsMessage;
 import org.apache.camel.component.sjms.SjmsTemplate;
 import org.apache.camel.component.sjms.jms.JmsMessageHelper;
 import org.apache.camel.support.AsyncProcessorConverterHelper;
@@ -210,6 +211,8 @@ public class EndpointMessageListener implements 
SessionMessageListener {
             // if we failed processed the exchange from the async callback 
task, then grab the exception
             rce = exchange.getException(RuntimeCamelException.class);
 
+            // release back when synchronous mode
+            consumer.releaseExchange(exchange, false);
         } catch (Exception e) {
             rce = wrapRuntimeCamelException(e);
         }
@@ -237,8 +240,15 @@ public class EndpointMessageListener implements 
SessionMessageListener {
     }
 
     public Exchange createExchange(Message message, Session session, Object 
replyDestination) {
-        // must be prototype scoped (not pooled) so we create the exchange via 
endpoint
-        Exchange exchange = endpoint.createExchange(message, session);
+        Exchange exchange = consumer.createExchange(false);
+        // reuse existing jms message if pooled
+        org.apache.camel.Message msg = exchange.getIn();
+        if (msg instanceof SjmsMessage) {
+            SjmsMessage jm = (SjmsMessage) msg;
+            jm.init(exchange, message, session, endpoint.getBinding());
+        } else {
+            exchange.setIn(new SjmsMessage(exchange, message, session, 
endpoint.getBinding()));
+        }
 
         // lets set to an InOut if we have some kind of reply-to destination
         if (replyDestination != null && !disableReplyTo) {
@@ -459,6 +469,11 @@ public class EndpointMessageListener implements 
SessionMessageListener {
                     }
                 }
             }
+
+            if (!doneSync) {
+                // release back when in asynchronous mode
+                consumer.releaseExchange(exchange, false);
+            }
         }
     }
 

Reply via email to