Repository: camel Updated Branches: refs/heads/master 7864dd180 -> e275446a4
CAMEL-9606 Share JMS session among SJMS endpoints to ensure transaction atomicity Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/825a35fb Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/825a35fb Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/825a35fb Branch: refs/heads/master Commit: 825a35fbf67bc1b298bf8f9d58a5e2603d65063d Parents: 7864dd1 Author: Tomohisa Igarashi <tm.igara...@gmail.com> Authored: Tue Oct 18 22:05:54 2016 +0900 Committer: Tomohisa Igarashi <tm.igara...@gmail.com> Committed: Tue Oct 18 22:35:52 2016 +0900 ---------------------------------------------------------------------- .../src/main/docs/sjms-component.adoc | 3 +- .../camel/component/sjms/SjmsConstants.java | 2 + .../camel/component/sjms/SjmsConsumer.java | 10 ++ .../camel/component/sjms/SjmsEndpoint.java | 16 ++++ .../camel/component/sjms/SjmsProducer.java | 99 ++++++++++++++++++-- .../sjms/consumer/AbstractMessageHandler.java | 21 ++++- .../component/sjms/producer/InOnlyProducer.java | 33 ++----- .../component/sjms/producer/InOutProducer.java | 29 +----- 8 files changed, 148 insertions(+), 65 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/825a35fb/components/camel-sjms/src/main/docs/sjms-component.adoc ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/docs/sjms-component.adoc b/components/camel-sjms/src/main/docs/sjms-component.adoc index ef12d42..89cf99f 100644 --- a/components/camel-sjms/src/main/docs/sjms-component.adoc +++ b/components/camel-sjms/src/main/docs/sjms-component.adoc @@ -133,7 +133,7 @@ The Simple JMS component supports 9 options which are listed below. // endpoint options: START -The Simple JMS component supports 32 endpoint options which are listed below: +The Simple JMS component supports 33 endpoint options which are listed below: {% raw %} [width="100%",cols="2,1,1m,1m,5",options="header"] @@ -171,6 +171,7 @@ The Simple JMS component supports 32 endpoint options which are listed below: | transactionBatchCount | transaction | -1 | int | If transacted sets the number of messages to process before committing a transaction. | transactionBatchTimeout | transaction | 5000 | long | Sets timeout (in millis) for batch transactions the value should be 1000 or higher. | transactionCommitStrategy | transaction | | TransactionCommitStrategy | Sets the commit strategy. +| sharedJMSSession | transaction (advanced) | true | boolean | Specifies whether to share JMS session with other SJMS endpoints. Turn this off if your route is accessing to multiple JMS providers. If you need transaction against multiple JMS providers use jms component to leverage XA transaction. |======================================================================= {% endraw %} // endpoint options: END http://git-wip-us.apache.org/repos/asf/camel/blob/825a35fb/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConstants.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConstants.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConstants.java index f963cf3..bd36424 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConstants.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConstants.java @@ -20,4 +20,6 @@ public interface SjmsConstants { String JMS_MESSAGE_TYPE = "JmsMessageType"; + String JMS_SESSION = "CamelJMSSession"; + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/825a35fb/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java index 162e252..581b337 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java @@ -241,6 +241,7 @@ public class SjmsConsumer extends DefaultConsumer { messageHandler.setProcessor(getAsyncProcessor()); messageHandler.setSynchronous(isSynchronous()); messageHandler.setTransacted(isTransacted()); + messageHandler.setSharedJMSSession(isSharedJMSSession()); messageHandler.setTopic(isTopic()); return messageHandler; } @@ -263,6 +264,14 @@ public class SjmsConsumer extends DefaultConsumer { } /** + * Use to determine if JMS session should be propagated to share with other SJMS endpoints. + * + * @return true if shared, otherwise false + */ + public boolean isSharedJMSSession() { + return getEndpoint().isSharedJMSSession(); + } + /** * Use to determine whether or not to process exchanges synchronously. * * @return true if synchronous @@ -342,4 +351,5 @@ public class SjmsConsumer extends DefaultConsumer { public long getTransactionBatchTimeout() { return getEndpoint().getTransactionBatchTimeout(); } + } http://git-wip-us.apache.org/repos/asf/camel/blob/825a35fb/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java index f6d5ce9..858c68d 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java @@ -76,6 +76,8 @@ public class SjmsEndpoint extends DefaultEndpoint implements AsyncEndpoint, Mult private boolean includeAllJMSXProperties; @UriParam(label = "consumer,transaction") private boolean transacted; + @UriParam(label = "transaction,advanced", defaultValue = "true") + private boolean sharedJMSSession = true; @UriParam(label = "producer") private String namedReplyTo; @UriParam(defaultValue = "AUTO_ACKNOWLEDGE", enums = "SESSION_TRANSACTED,CLIENT_ACKNOWLEDGE,AUTO_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE") @@ -455,6 +457,20 @@ public class SjmsEndpoint extends DefaultEndpoint implements AsyncEndpoint, Mult this.transacted = transacted; } + public boolean isSharedJMSSession() { + return sharedJMSSession; + } + + /** + * Specifies whether to share JMS session with other SJMS endpoints. + * Turn this off if your route is accessing to multiple JMS providers. + * If you need transaction against multiple JMS providers, use jms + * component to leverage XA transaction. + */ + public void setSharedJMSSession(boolean share) { + this.sharedJMSSession = share; + } + public String getNamedReplyTo() { return namedReplyTo; } http://git-wip-us.apache.org/repos/asf/camel/blob/825a35fb/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java index 88e0ca2..39efd07 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java @@ -19,10 +19,17 @@ package org.apache.camel.component.sjms; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.MessageProducer; +import javax.jms.Session; + import org.apache.camel.AsyncCallback; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.component.sjms.jms.ConnectionResource; +import org.apache.camel.component.sjms.jms.JmsObjectFactory; +import org.apache.camel.component.sjms.tx.SessionTransactionSynchronization; import org.apache.camel.impl.DefaultAsyncProducer; import org.apache.camel.util.ObjectHelper; import org.apache.commons.pool.BasePoolableObjectFactory; @@ -41,7 +48,7 @@ public abstract class SjmsProducer extends DefaultAsyncProducer { @Override public MessageProducerResources makeObject() throws Exception { - return doCreateProducerModel(); + return doCreateProducerModel(createSession()); } @Override @@ -153,9 +160,48 @@ public abstract class SjmsProducer extends DefaultAsyncProducer { return (SjmsEndpoint) super.getEndpoint(); } - public abstract MessageProducerResources doCreateProducerModel() throws Exception; + protected MessageProducerResources doCreateProducerModel(Session session) throws Exception { + MessageProducerResources answer; + try { + Destination destination = getEndpoint().getDestinationCreationStrategy().createDestination(session, getDestinationName(), isTopic()); + MessageProducer messageProducer = JmsObjectFactory.createMessageProducer(session, destination, isPersistent(), getTtl()); + + answer = new MessageProducerResources(session, messageProducer, getCommitStrategy()); + + } catch (Exception e) { + log.error("Unable to create the MessageProducer", e); + throw e; + } + return answer; + } + + protected Session createSession() throws Exception { + Connection conn = getConnectionResource().borrowConnection(); + try { + return conn.createSession(isEndpointTransacted(), getAcknowledgeMode()); + } catch (Exception e) { + log.error("Unable to create the Session", e); + throw e; + } finally { + getConnectionResource().returnConnection(conn); + } + } + + protected interface ReleaseProducerCallback { + void release(MessageProducerResources producer) throws Exception; + } + + protected class NOOPReleaseProducerCallback implements ReleaseProducerCallback { + public void release(MessageProducerResources producer) throws Exception { /* no-op */ } + } + + protected class ReturnProducerCallback implements ReleaseProducerCallback { + public void release(MessageProducerResources producer) throws Exception { + getProducers().returnObject(producer); + } + } - public abstract void sendMessage(Exchange exchange, final AsyncCallback callback, final MessageProducerResources producer) throws Exception; + public abstract void sendMessage(Exchange exchange, final AsyncCallback callback, final MessageProducerResources producer, final ReleaseProducerCallback releaseProducerCallback) throws Exception; @Override public boolean process(final Exchange exchange, final AsyncCallback callback) { @@ -164,7 +210,30 @@ public abstract class SjmsProducer extends DefaultAsyncProducer { } try { - final MessageProducerResources producer = getProducers().borrowObject(); + MessageProducerResources producer = null; + ReleaseProducerCallback releaseProducerCallback = null; + if (isEndpointTransacted() && isSharedJMSSession()) { + Session session = exchange.getIn().getHeader(SjmsConstants.JMS_SESSION, Session.class); + if (session != null && session.getTransacted()) { + // Join existing transacted session - Synchronization must have been added + // by the session initiator + producer = doCreateProducerModel(session); + releaseProducerCallback = new NOOPReleaseProducerCallback(); + } else { + // Propagate JMS session and register Synchronization as an initiator + producer = getProducers().borrowObject(); + releaseProducerCallback = new ReturnProducerCallback(); + exchange.getIn().setHeader(SjmsConstants.JMS_SESSION, producer.getSession()); + exchange.getUnitOfWork().addSynchronization(new SessionTransactionSynchronization(producer.getSession(), producer.getCommitStrategy())); + } + } else { + producer = getProducers().borrowObject(); + releaseProducerCallback = new ReturnProducerCallback(); + if (isEndpointTransacted()) { + exchange.getUnitOfWork().addSynchronization(new SessionTransactionSynchronization(producer.getSession(), producer.getCommitStrategy())); + } + } + if (producer == null) { exchange.setException(new Exception("Unable to send message: connection not available")); } else { @@ -172,11 +241,13 @@ public abstract class SjmsProducer extends DefaultAsyncProducer { if (log.isDebugEnabled()) { log.debug(" Sending message asynchronously: {}", exchange.getIn().getBody()); } + final MessageProducerResources finalProducer = producer; + final ReleaseProducerCallback finalrpc = releaseProducerCallback; getExecutor().execute(new Runnable() { @Override public void run() { try { - sendMessage(exchange, callback, producer); + sendMessage(exchange, callback, finalProducer, finalrpc); } catch (Exception e) { ObjectHelper.wrapRuntimeCamelException(e); } @@ -186,7 +257,7 @@ public abstract class SjmsProducer extends DefaultAsyncProducer { if (log.isDebugEnabled()) { log.debug(" Sending message synchronously: {}", exchange.getIn().getBody()); } - sendMessage(exchange, callback, producer); + sendMessage(exchange, callback, producer, releaseProducerCallback); } } } catch (Exception e) { @@ -281,6 +352,15 @@ public abstract class SjmsProducer extends DefaultAsyncProducer { } /** + * Test to determine if this endpoint should share a JMS Session with other SJMS endpoints. + * + * @return true if shared, otherwise false + */ + public boolean isSharedJMSSession() { + return getEndpoint().isSharedJMSSession(); + } + + /** * Returns the named reply to value for this producer * * @return true if it is a Topic, otherwise it is a Queue @@ -348,8 +428,11 @@ public abstract class SjmsProducer extends DefaultAsyncProducer { * * @return TransactionCommitStrategy */ - public TransactionCommitStrategy getCommitStrategy() { - return getEndpoint().getTransactionCommitStrategy(); + protected TransactionCommitStrategy getCommitStrategy() { + if (isEndpointTransacted()) { + return getEndpoint().getTransactionCommitStrategy(); + } + return null; } } http://git-wip-us.apache.org/repos/asf/camel/blob/825a35fb/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/AbstractMessageHandler.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/AbstractMessageHandler.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/AbstractMessageHandler.java index f394008..2cbc2ea 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/AbstractMessageHandler.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/AbstractMessageHandler.java @@ -24,6 +24,7 @@ import javax.jms.Session; import org.apache.camel.AsyncProcessor; import org.apache.camel.Exchange; import org.apache.camel.RuntimeCamelException; +import org.apache.camel.component.sjms.SjmsConstants; import org.apache.camel.component.sjms.SjmsEndpoint; import org.apache.camel.spi.Synchronization; import org.slf4j.Logger; @@ -44,6 +45,7 @@ public abstract class AbstractMessageHandler implements MessageListener { private AsyncProcessor processor; private Session session; private boolean transacted; + private boolean sharedJMSSession; private boolean synchronous = true; private Synchronization synchronization; private boolean topic; @@ -72,8 +74,14 @@ public abstract class AbstractMessageHandler implements MessageListener { log.debug("Processing Exchange.id:{}", exchange.getExchangeId()); - if (isTransacted() && synchronization != null) { - exchange.addOnCompletion(synchronization); + if (isTransacted()) { + if (synchronization != null) { + exchange.addOnCompletion(synchronization); + } + if (isSharedJMSSession()) { + // Propagate a JMS Session as an initiator if sharedJMSSession is enabled + exchange.getIn().setHeader(SjmsConstants.JMS_SESSION, getSession()); + } } try { if (isTransacted() || isSynchronous()) { @@ -123,6 +131,14 @@ public abstract class AbstractMessageHandler implements MessageListener { return transacted; } + public void setSharedJMSSession(boolean share) { + this.sharedJMSSession = share; + } + + public boolean isSharedJMSSession() { + return sharedJMSSession; + } + public SjmsEndpoint getEndpoint() { return endpoint; } @@ -158,4 +174,5 @@ public abstract class AbstractMessageHandler implements MessageListener { public boolean isTopic() { return topic; } + } http://git-wip-us.apache.org/repos/asf/camel/blob/825a35fb/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java index 93f8648..e01db51 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java @@ -46,31 +46,15 @@ public class InOnlyProducer extends SjmsProducer { } @Override - public MessageProducerResources doCreateProducerModel() throws Exception { - MessageProducerResources answer; - Connection conn = getConnectionResource().borrowConnection(); - try { - TransactionCommitStrategy commitStrategy = null; - if (isEndpointTransacted()) { - commitStrategy = getCommitStrategy() == null ? new DefaultTransactionCommitStrategy() : getCommitStrategy(); - } - Session session = conn.createSession(isEndpointTransacted(), getAcknowledgeMode()); - Destination destination = getEndpoint().getDestinationCreationStrategy().createDestination(session, getDestinationName(), isTopic()); - MessageProducer messageProducer = JmsObjectFactory.createMessageProducer(session, destination, isPersistent(), getTtl()); - - answer = new MessageProducerResources(session, messageProducer, commitStrategy); - - } catch (Exception e) { - log.error("Unable to create the MessageProducer", e); - throw e; - } finally { - getConnectionResource().returnConnection(conn); + protected TransactionCommitStrategy getCommitStrategy() { + if (isEndpointTransacted()) { + return super.getCommitStrategy() == null ? new DefaultTransactionCommitStrategy() : super.getCommitStrategy(); } - return answer; + return null; } @Override - public void sendMessage(final Exchange exchange, final AsyncCallback callback, final MessageProducerResources producer) throws Exception { + public void sendMessage(final Exchange exchange, final AsyncCallback callback, final MessageProducerResources producer, final ReleaseProducerCallback releaseProducerCallback) throws Exception { try { Collection<Message> messages = new ArrayList<Message>(1); if (exchange.getIn().getBody() != null) { @@ -95,18 +79,13 @@ public class InOnlyProducer extends SjmsProducer { messages.add(message); } - if (isEndpointTransacted()) { - exchange.getUnitOfWork().addSynchronization(new SessionTransactionSynchronization(producer.getSession(), producer.getCommitStrategy())); - } for (final Message message : messages) { producer.getMessageProducer().send(message); } } catch (Exception e) { exchange.setException(new Exception("Unable to complete sending the message: ", e)); } finally { - if (producer != null) { - getProducers().returnObject(producer); - } + releaseProducerCallback.release(producer); callback.done(isSynchronous()); } } http://git-wip-us.apache.org/repos/asf/camel/blob/825a35fb/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java index 202b429..68d13e2 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java @@ -168,37 +168,12 @@ public class InOutProducer extends SjmsProducer { } } - @Override - public MessageProducerResources doCreateProducerModel() throws Exception { - MessageProducerResources answer; - Connection conn = getConnectionResource().borrowConnection(); - try { - Session session = conn.createSession(isEndpointTransacted(), getAcknowledgeMode()); - Destination destination = getEndpoint().getDestinationCreationStrategy().createDestination(session, getDestinationName(), isTopic()); - MessageProducer messageProducer = JmsObjectFactory.createMessageProducer(session, destination, isPersistent(), getTtl()); - - answer = new MessageProducerResources(session, messageProducer); - - } catch (Exception e) { - log.error("Unable to create the MessageProducer", e); - throw e; - } finally { - getConnectionResource().returnConnection(conn); - } - - return answer; - } - /** * TODO time out is actually double as it waits for the producer and then * waits for the response. Use an atomic long to manage the countdown */ @Override - public void sendMessage(final Exchange exchange, final AsyncCallback callback, final MessageProducerResources producer) throws Exception { - if (isEndpointTransacted()) { - exchange.getUnitOfWork().addSynchronization(new SessionTransactionSynchronization(producer.getSession(), getCommitStrategy())); - } - + public void sendMessage(final Exchange exchange, final AsyncCallback callback, final MessageProducerResources producer, final ReleaseProducerCallback releaseProducerCallback) throws Exception { Message request = getEndpoint().getBinding().makeJmsMessage(exchange, producer.getSession()); String correlationId = exchange.getIn().getHeader(JmsConstants.JMS_CORRELATION_ID, String.class); @@ -221,7 +196,7 @@ public class InOutProducer extends SjmsProducer { // can move forward // without waiting on us to complete the exchange try { - getProducers().returnObject(producer); + releaseProducerCallback.release(producer); } catch (Exception exception) { // thrown if the pool is full. safe to ignore. }