Repository: camel
Updated Branches:
  refs/heads/master 7864dd180 -> e275446a4


CAMEL-9606 Share JMS session among SJMS endpoints to ensure transaction 
atomicity


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/825a35fb
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/825a35fb
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/825a35fb

Branch: refs/heads/master
Commit: 825a35fbf67bc1b298bf8f9d58a5e2603d65063d
Parents: 7864dd1
Author: Tomohisa Igarashi <tm.igara...@gmail.com>
Authored: Tue Oct 18 22:05:54 2016 +0900
Committer: Tomohisa Igarashi <tm.igara...@gmail.com>
Committed: Tue Oct 18 22:35:52 2016 +0900

----------------------------------------------------------------------
 .../src/main/docs/sjms-component.adoc           |  3 +-
 .../camel/component/sjms/SjmsConstants.java     |  2 +
 .../camel/component/sjms/SjmsConsumer.java      | 10 ++
 .../camel/component/sjms/SjmsEndpoint.java      | 16 ++++
 .../camel/component/sjms/SjmsProducer.java      | 99 ++++++++++++++++++--
 .../sjms/consumer/AbstractMessageHandler.java   | 21 ++++-
 .../component/sjms/producer/InOnlyProducer.java | 33 ++-----
 .../component/sjms/producer/InOutProducer.java  | 29 +-----
 8 files changed, 148 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/825a35fb/components/camel-sjms/src/main/docs/sjms-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/docs/sjms-component.adoc 
b/components/camel-sjms/src/main/docs/sjms-component.adoc
index ef12d42..89cf99f 100644
--- a/components/camel-sjms/src/main/docs/sjms-component.adoc
+++ b/components/camel-sjms/src/main/docs/sjms-component.adoc
@@ -133,7 +133,7 @@ The Simple JMS component supports 9 options which are 
listed below.
 
 
 // endpoint options: START
-The Simple JMS component supports 32 endpoint options which are listed below:
+The Simple JMS component supports 33 endpoint options which are listed below:
 
 {% raw %}
 [width="100%",cols="2,1,1m,1m,5",options="header"]
@@ -171,6 +171,7 @@ The Simple JMS component supports 32 endpoint options which 
are listed below:
 | transactionBatchCount | transaction | -1 | int | If transacted sets the 
number of messages to process before committing a transaction.
 | transactionBatchTimeout | transaction | 5000 | long | Sets timeout (in 
millis) for batch transactions the value should be 1000 or higher.
 | transactionCommitStrategy | transaction |  | TransactionCommitStrategy | 
Sets the commit strategy.
+| sharedJMSSession | transaction (advanced) | true | boolean | Specifies 
whether to share JMS session with other SJMS endpoints. Turn this off if your 
route is accessing to multiple JMS providers. If you need transaction against 
multiple JMS providers use jms component to leverage XA transaction.
 |=======================================================================
 {% endraw %}
 // endpoint options: END

http://git-wip-us.apache.org/repos/asf/camel/blob/825a35fb/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConstants.java
----------------------------------------------------------------------
diff --git 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConstants.java
 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConstants.java
index f963cf3..bd36424 100644
--- 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConstants.java
+++ 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConstants.java
@@ -20,4 +20,6 @@ public interface SjmsConstants {
 
     String JMS_MESSAGE_TYPE = "JmsMessageType";
 
+    String JMS_SESSION = "CamelJMSSession";
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/825a35fb/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java
 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java
index 162e252..581b337 100644
--- 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java
+++ 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java
@@ -241,6 +241,7 @@ public class SjmsConsumer extends DefaultConsumer {
         messageHandler.setProcessor(getAsyncProcessor());
         messageHandler.setSynchronous(isSynchronous());
         messageHandler.setTransacted(isTransacted());
+        messageHandler.setSharedJMSSession(isSharedJMSSession());
         messageHandler.setTopic(isTopic());
         return messageHandler;
     }
@@ -263,6 +264,14 @@ public class SjmsConsumer extends DefaultConsumer {
     }
 
     /**
+     * Use to determine if JMS session should be propagated to share with 
other SJMS endpoints.
+     *
+     * @return true if shared, otherwise false
+     */
+    public boolean isSharedJMSSession() {
+        return getEndpoint().isSharedJMSSession();
+    }
+    /**
      * Use to determine whether or not to process exchanges synchronously.
      *
      * @return true if synchronous
@@ -342,4 +351,5 @@ public class SjmsConsumer extends DefaultConsumer {
     public long getTransactionBatchTimeout() {
         return getEndpoint().getTransactionBatchTimeout();
     }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/825a35fb/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
index f6d5ce9..858c68d 100644
--- 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
+++ 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
@@ -76,6 +76,8 @@ public class SjmsEndpoint extends DefaultEndpoint implements 
AsyncEndpoint, Mult
     private boolean includeAllJMSXProperties;
     @UriParam(label = "consumer,transaction")
     private boolean transacted;
+    @UriParam(label = "transaction,advanced", defaultValue = "true")
+    private boolean sharedJMSSession = true;
     @UriParam(label = "producer")
     private String namedReplyTo;
     @UriParam(defaultValue = "AUTO_ACKNOWLEDGE", enums = 
"SESSION_TRANSACTED,CLIENT_ACKNOWLEDGE,AUTO_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE")
@@ -455,6 +457,20 @@ public class SjmsEndpoint extends DefaultEndpoint 
implements AsyncEndpoint, Mult
         this.transacted = transacted;
     }
 
+    public boolean isSharedJMSSession() {
+        return sharedJMSSession;
+    }
+
+    /**
+     * Specifies whether to share JMS session with other SJMS endpoints.
+     * Turn this off if your route is accessing to multiple JMS providers.
+     * If you need transaction against multiple JMS providers, use jms
+     * component to leverage XA transaction.
+     */
+    public void setSharedJMSSession(boolean share) {
+        this.sharedJMSSession = share;
+    }
+
     public String getNamedReplyTo() {
         return namedReplyTo;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/825a35fb/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
index 88e0ca2..39efd07 100644
--- 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
+++ 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
@@ -19,10 +19,17 @@ package org.apache.camel.component.sjms;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.component.sjms.jms.ConnectionResource;
+import org.apache.camel.component.sjms.jms.JmsObjectFactory;
+import org.apache.camel.component.sjms.tx.SessionTransactionSynchronization;
 import org.apache.camel.impl.DefaultAsyncProducer;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.commons.pool.BasePoolableObjectFactory;
@@ -41,7 +48,7 @@ public abstract class SjmsProducer extends 
DefaultAsyncProducer {
 
         @Override
         public MessageProducerResources makeObject() throws Exception {
-            return doCreateProducerModel();
+            return doCreateProducerModel(createSession());
         }
 
         @Override
@@ -153,9 +160,48 @@ public abstract class SjmsProducer extends 
DefaultAsyncProducer {
         return (SjmsEndpoint) super.getEndpoint();
     }
 
-    public abstract MessageProducerResources doCreateProducerModel() throws 
Exception;
+    protected MessageProducerResources doCreateProducerModel(Session session) 
throws Exception {
+        MessageProducerResources answer;
+        try {
+            Destination destination = 
getEndpoint().getDestinationCreationStrategy().createDestination(session, 
getDestinationName(), isTopic());
+            MessageProducer messageProducer = 
JmsObjectFactory.createMessageProducer(session, destination, isPersistent(), 
getTtl());
+
+            answer = new MessageProducerResources(session, messageProducer, 
getCommitStrategy());
+
+        } catch (Exception e) {
+            log.error("Unable to create the MessageProducer", e);
+            throw e;
+        }
+        return answer;
+    }
+
+    protected Session createSession() throws Exception {
+        Connection conn = getConnectionResource().borrowConnection();
+        try {
+            return conn.createSession(isEndpointTransacted(), 
getAcknowledgeMode());
+        } catch (Exception e) {
+            log.error("Unable to create the Session", e);
+            throw e;
+        } finally {
+            getConnectionResource().returnConnection(conn);
+        }
+    }
+
+    protected interface ReleaseProducerCallback {
+        void release(MessageProducerResources producer) throws Exception;
+    }
+
+    protected class NOOPReleaseProducerCallback implements 
ReleaseProducerCallback {
+        public void release(MessageProducerResources producer) throws 
Exception { /* no-op */ }
+    }
+
+    protected class ReturnProducerCallback implements ReleaseProducerCallback {
+        public void release(MessageProducerResources producer) throws 
Exception {
+            getProducers().returnObject(producer);
+        }
+    }
 
-    public abstract void sendMessage(Exchange exchange, final AsyncCallback 
callback, final MessageProducerResources producer) throws Exception;
+    public abstract void sendMessage(Exchange exchange, final AsyncCallback 
callback, final MessageProducerResources producer, final 
ReleaseProducerCallback releaseProducerCallback) throws Exception;
 
     @Override
     public boolean process(final Exchange exchange, final AsyncCallback 
callback) {
@@ -164,7 +210,30 @@ public abstract class SjmsProducer extends 
DefaultAsyncProducer {
         }
 
         try {
-            final MessageProducerResources producer = 
getProducers().borrowObject();
+            MessageProducerResources producer = null;
+            ReleaseProducerCallback releaseProducerCallback = null;
+            if (isEndpointTransacted() && isSharedJMSSession()) {
+                Session session = 
exchange.getIn().getHeader(SjmsConstants.JMS_SESSION, Session.class);
+                if (session != null && session.getTransacted()) {
+                    // Join existing transacted session - Synchronization must 
have been added
+                    // by the session initiator
+                    producer = doCreateProducerModel(session);
+                    releaseProducerCallback = new 
NOOPReleaseProducerCallback();
+                } else {
+                    // Propagate JMS session and register Synchronization as 
an initiator
+                    producer = getProducers().borrowObject();
+                    releaseProducerCallback = new ReturnProducerCallback();
+                    exchange.getIn().setHeader(SjmsConstants.JMS_SESSION, 
producer.getSession());
+                    exchange.getUnitOfWork().addSynchronization(new 
SessionTransactionSynchronization(producer.getSession(), 
producer.getCommitStrategy()));
+                }
+            } else {
+                producer = getProducers().borrowObject();
+                releaseProducerCallback = new ReturnProducerCallback();
+                if (isEndpointTransacted()) {
+                    exchange.getUnitOfWork().addSynchronization(new 
SessionTransactionSynchronization(producer.getSession(), 
producer.getCommitStrategy()));
+                }
+            }
+            
             if (producer == null) {
                 exchange.setException(new Exception("Unable to send message: 
connection not available"));
             } else {
@@ -172,11 +241,13 @@ public abstract class SjmsProducer extends 
DefaultAsyncProducer {
                     if (log.isDebugEnabled()) {
                         log.debug("  Sending message asynchronously: {}", 
exchange.getIn().getBody());
                     }
+                    final MessageProducerResources finalProducer = producer;
+                    final ReleaseProducerCallback finalrpc = 
releaseProducerCallback;
                     getExecutor().execute(new Runnable() {
                         @Override
                         public void run() {
                             try {
-                                sendMessage(exchange, callback, producer);
+                                sendMessage(exchange, callback, finalProducer, 
finalrpc);
                             } catch (Exception e) {
                                 ObjectHelper.wrapRuntimeCamelException(e);
                             }
@@ -186,7 +257,7 @@ public abstract class SjmsProducer extends 
DefaultAsyncProducer {
                     if (log.isDebugEnabled()) {
                         log.debug("  Sending message synchronously: {}", 
exchange.getIn().getBody());
                     }
-                    sendMessage(exchange, callback, producer);
+                    sendMessage(exchange, callback, producer, 
releaseProducerCallback);
                 }
             }
         } catch (Exception e) {
@@ -281,6 +352,15 @@ public abstract class SjmsProducer extends 
DefaultAsyncProducer {
     }
 
     /**
+     * Test to determine if this endpoint should share a JMS Session with 
other SJMS endpoints.
+     *
+     * @return true if shared, otherwise false
+     */
+    public boolean isSharedJMSSession() {
+        return getEndpoint().isSharedJMSSession();
+    }
+
+    /**
      * Returns the named reply to value for this producer
      *
      * @return true if it is a Topic, otherwise it is a Queue
@@ -348,8 +428,11 @@ public abstract class SjmsProducer extends 
DefaultAsyncProducer {
      *
      * @return TransactionCommitStrategy
      */
-    public TransactionCommitStrategy getCommitStrategy() {
-        return getEndpoint().getTransactionCommitStrategy();
+    protected TransactionCommitStrategy getCommitStrategy() {
+        if (isEndpointTransacted()) {
+            return getEndpoint().getTransactionCommitStrategy();
+        }
+        return null;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/825a35fb/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/AbstractMessageHandler.java
----------------------------------------------------------------------
diff --git 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/AbstractMessageHandler.java
 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/AbstractMessageHandler.java
index f394008..2cbc2ea 100644
--- 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/AbstractMessageHandler.java
+++ 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/AbstractMessageHandler.java
@@ -24,6 +24,7 @@ import javax.jms.Session;
 import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Exchange;
 import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.component.sjms.SjmsConstants;
 import org.apache.camel.component.sjms.SjmsEndpoint;
 import org.apache.camel.spi.Synchronization;
 import org.slf4j.Logger;
@@ -44,6 +45,7 @@ public abstract class AbstractMessageHandler implements 
MessageListener {
     private AsyncProcessor processor;
     private Session session;
     private boolean transacted;
+    private boolean sharedJMSSession;
     private boolean synchronous = true;
     private Synchronization synchronization;
     private boolean topic;
@@ -72,8 +74,14 @@ public abstract class AbstractMessageHandler implements 
MessageListener {
 
             log.debug("Processing Exchange.id:{}", exchange.getExchangeId());
 
-            if (isTransacted() && synchronization != null) {
-                exchange.addOnCompletion(synchronization);
+            if (isTransacted()) {
+                if (synchronization != null) {
+                    exchange.addOnCompletion(synchronization);
+                }
+                if (isSharedJMSSession()) {
+                    // Propagate a JMS Session as an initiator if 
sharedJMSSession is enabled
+                    exchange.getIn().setHeader(SjmsConstants.JMS_SESSION, 
getSession());
+                }
             }
             try {
                 if (isTransacted() || isSynchronous()) {
@@ -123,6 +131,14 @@ public abstract class AbstractMessageHandler implements 
MessageListener {
         return transacted;
     }
 
+    public void setSharedJMSSession(boolean share) {
+        this.sharedJMSSession = share;
+    }
+
+    public boolean isSharedJMSSession() {
+        return sharedJMSSession;
+    }
+
     public SjmsEndpoint getEndpoint() {
         return endpoint;
     }
@@ -158,4 +174,5 @@ public abstract class AbstractMessageHandler implements 
MessageListener {
     public boolean isTopic() {
         return topic;
     }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/825a35fb/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java
 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java
index 93f8648..e01db51 100644
--- 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java
+++ 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java
@@ -46,31 +46,15 @@ public class InOnlyProducer extends SjmsProducer {
     }
 
     @Override
-    public MessageProducerResources doCreateProducerModel() throws Exception {
-        MessageProducerResources answer;
-        Connection conn = getConnectionResource().borrowConnection();
-        try {
-            TransactionCommitStrategy commitStrategy = null;
-            if (isEndpointTransacted()) {
-                commitStrategy = getCommitStrategy() == null ? new 
DefaultTransactionCommitStrategy() : getCommitStrategy();
-            }
-            Session session = conn.createSession(isEndpointTransacted(), 
getAcknowledgeMode());
-            Destination destination = 
getEndpoint().getDestinationCreationStrategy().createDestination(session, 
getDestinationName(), isTopic());
-            MessageProducer messageProducer = 
JmsObjectFactory.createMessageProducer(session, destination, isPersistent(), 
getTtl());
-
-            answer = new MessageProducerResources(session, messageProducer, 
commitStrategy);
-
-        } catch (Exception e) {
-            log.error("Unable to create the MessageProducer", e);
-            throw e;
-        } finally {
-            getConnectionResource().returnConnection(conn);
+    protected TransactionCommitStrategy getCommitStrategy() {
+        if (isEndpointTransacted()) {
+            return super.getCommitStrategy() == null ? new 
DefaultTransactionCommitStrategy() : super.getCommitStrategy();
         }
-        return answer;
+        return null;
     }
 
     @Override
-    public void sendMessage(final Exchange exchange, final AsyncCallback 
callback, final MessageProducerResources producer) throws Exception {
+    public void sendMessage(final Exchange exchange, final AsyncCallback 
callback, final MessageProducerResources producer, final 
ReleaseProducerCallback releaseProducerCallback) throws Exception {
         try {
             Collection<Message> messages = new ArrayList<Message>(1);
             if (exchange.getIn().getBody() != null) {
@@ -95,18 +79,13 @@ public class InOnlyProducer extends SjmsProducer {
                 messages.add(message);
             }
 
-            if (isEndpointTransacted()) {
-                exchange.getUnitOfWork().addSynchronization(new 
SessionTransactionSynchronization(producer.getSession(), 
producer.getCommitStrategy()));
-            }
             for (final Message message : messages) {
                 producer.getMessageProducer().send(message);
             }
         } catch (Exception e) {
             exchange.setException(new Exception("Unable to complete sending 
the message: ", e));
         } finally {
-            if (producer != null) {
-                getProducers().returnObject(producer);
-            }
+            releaseProducerCallback.release(producer);
             callback.done(isSynchronous());
         }
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/825a35fb/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
index 202b429..68d13e2 100644
--- 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
+++ 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
@@ -168,37 +168,12 @@ public class InOutProducer extends SjmsProducer {
         }
     }
 
-    @Override
-    public MessageProducerResources doCreateProducerModel() throws Exception {
-        MessageProducerResources answer;
-        Connection conn = getConnectionResource().borrowConnection();
-        try {
-            Session session = conn.createSession(isEndpointTransacted(), 
getAcknowledgeMode());
-            Destination destination = 
getEndpoint().getDestinationCreationStrategy().createDestination(session, 
getDestinationName(), isTopic());
-            MessageProducer messageProducer = 
JmsObjectFactory.createMessageProducer(session, destination, isPersistent(), 
getTtl());
-
-            answer = new MessageProducerResources(session, messageProducer);
-
-        } catch (Exception e) {
-            log.error("Unable to create the MessageProducer", e);
-            throw e;
-        } finally {
-            getConnectionResource().returnConnection(conn);
-        }
-
-        return answer;
-    }
-
     /**
      * TODO time out is actually double as it waits for the producer and then
      * waits for the response. Use an atomic long to manage the countdown
      */
     @Override
-    public void sendMessage(final Exchange exchange, final AsyncCallback 
callback, final MessageProducerResources producer) throws Exception {
-        if (isEndpointTransacted()) {
-            exchange.getUnitOfWork().addSynchronization(new 
SessionTransactionSynchronization(producer.getSession(), getCommitStrategy()));
-        }
-
+    public void sendMessage(final Exchange exchange, final AsyncCallback 
callback, final MessageProducerResources producer, final 
ReleaseProducerCallback releaseProducerCallback) throws Exception {
         Message request = getEndpoint().getBinding().makeJmsMessage(exchange, 
producer.getSession());
 
         String correlationId = 
exchange.getIn().getHeader(JmsConstants.JMS_CORRELATION_ID, String.class);
@@ -221,7 +196,7 @@ public class InOutProducer extends SjmsProducer {
         // can move forward
         // without waiting on us to complete the exchange
         try {
-            getProducers().returnObject(producer);
+            releaseProducerCallback.release(producer);
         } catch (Exception exception) {
             // thrown if the pool is full. safe to ignore.
         }

Reply via email to