Updated Branches: refs/heads/master 0ab6bba5b -> dbe794c37
CAMEL-6362: Consumers should always use dedicated Sessions Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/520e55fa Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/520e55fa Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/520e55fa Branch: refs/heads/master Commit: 520e55fa81f7aa9faa65960ad36be0af8f3d03d0 Parents: 29e4411 Author: Scott England-Sullivan <sully6...@apache.org> Authored: Tue Oct 29 11:17:35 2013 -0500 Committer: Scott England-Sullivan <sully6...@apache.org> Committed: Tue Oct 29 11:17:35 2013 -0500 ---------------------------------------------------------------------- .../camel/component/sjms/SjmsConsumer.java | 75 +++++------ .../camel/component/sjms/SjmsEndpoint.java | 42 +----- .../camel/component/sjms/SjmsProducer.java | 5 +- .../component/sjms/producer/InOnlyProducer.java | 102 +++++++++------ .../component/sjms/producer/InOutProducer.java | 127 ++++++++++++------- .../InOnlyTopicDurableConsumerTest.java | 4 +- 6 files changed, 180 insertions(+), 175 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/520e55fa/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java index 806537d..5b01c3a 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java @@ -23,6 +23,7 @@ import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; +import org.apache.camel.CamelException; import org.apache.camel.Endpoint; import org.apache.camel.ExchangePattern; import org.apache.camel.Processor; @@ -32,7 +33,6 @@ import org.apache.camel.component.sjms.consumer.InOutMessageHandler; import org.apache.camel.component.sjms.jms.ConnectionResource; import org.apache.camel.component.sjms.jms.JmsObjectFactory; import org.apache.camel.component.sjms.jms.ObjectPool; -import org.apache.camel.component.sjms.jms.SessionPool; import org.apache.camel.component.sjms.taskmanager.TimedTaskManager; import org.apache.camel.component.sjms.tx.BatchTransactionCommitStrategy; import org.apache.camel.component.sjms.tx.DefaultTransactionCommitStrategy; @@ -66,12 +66,7 @@ public class SjmsConsumer extends DefaultConsumer { */ @Override protected MessageConsumerResources createObject() throws Exception { - MessageConsumerResources model = null; - if (isTransacted() || getEndpoint().getExchangePattern().equals(ExchangePattern.InOut)) { - model = createConsumerWithDedicatedSession(); - } else { - model = createConsumerListener(); - } + MessageConsumerResources model = createConsumer(); return model; } @@ -168,39 +163,41 @@ public class SjmsConsumer extends DefaultConsumer { * Creates a {@link MessageConsumerResources} with a dedicated * {@link Session} required for transacted and InOut consumers. */ - private MessageConsumerResources createConsumerWithDedicatedSession() throws Exception { - Connection conn = getConnectionResource().borrowConnection(); - Session session = null; - if (isTransacted()) { - session = conn.createSession(true, Session.SESSION_TRANSACTED); - } else { - session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + @SuppressWarnings("unused") + private MessageConsumerResources createConsumer() throws Exception { + MessageConsumerResources answer = null; + Connection conn = null; + try { + conn = getConnectionResource().borrowConnection(); + + Session session = null; + MessageConsumer messageConsumer = null; + if (isTransacted()) { + session = conn.createSession(true, Session.SESSION_TRANSACTED); + } else { + session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + } + messageConsumer = JmsObjectFactory.createMessageConsumer(session, getDestinationName(), getMessageSelector(), isTopic(), getDurableSubscriptionId()); + MessageListener handler = createMessageHandler(session); + messageConsumer.setMessageListener(handler); + + if (session == null) { + throw new CamelException("Message Consumer Creation Exception: Session is NULL"); + } + if (messageConsumer == null) { + throw new CamelException("Message Consumer Creation Exception: MessageConsumer is NULL"); + } + answer = new MessageConsumerResources(session, messageConsumer); + } catch (Exception e) { + log.error("Unable to create the MessageConsumer: " + e.getLocalizedMessage()); + } finally { + if (conn != null) { + getConnectionResource().returnConnection(conn); + } } - MessageConsumer messageConsumer = JmsObjectFactory.createMessageConsumer(session, getDestinationName(), getMessageSelector(), isTopic(), getDurableSubscriptionId()); - MessageListener handler = createMessageHandler(session); - messageConsumer.setMessageListener(handler); - getConnectionResource().returnConnection(conn); - return new MessageConsumerResources(session, messageConsumer); + return answer; } - /** - * Creates a {@link MessageConsumerResources} with a shared {@link Session} - * for non-transacted InOnly consumers. - */ - private MessageConsumerResources createConsumerListener() throws Exception { - Session queueSession = getSessionPool().borrowObject(); - MessageConsumer messageConsumer = null; - if (isTopic()) { - messageConsumer = JmsObjectFactory.createTopicConsumer(queueSession, getDestinationName(), getMessageSelector()); - } else { - messageConsumer = JmsObjectFactory.createQueueConsumer(queueSession, getDestinationName(), getMessageSelector()); - } - getSessionPool().returnObject(queueSession); - // Don't pass in the session. Only needed if we are transacted - MessageListener handler = createMessageHandler(null); - messageConsumer.setMessageListener(handler); - return new MessageConsumerResources(messageConsumer); - } /** * Helper factory method used to create a MessageListener based on the MEP @@ -253,10 +250,6 @@ public class SjmsConsumer extends DefaultConsumer { return getEndpoint().getConnectionResource(); } - protected SessionPool getSessionPool() { - return getEndpoint().getSessions(); - } - public int getAcknowledgementMode() { return getEndpoint().getAcknowledgementMode().intValue(); } http://git-wip-us.apache.org/repos/asf/camel/blob/520e55fa/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java index edaa70c..63118ec 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java @@ -25,7 +25,6 @@ import org.apache.camel.Producer; import org.apache.camel.component.sjms.jms.ConnectionResource; import org.apache.camel.component.sjms.jms.KeyFormatStrategy; import org.apache.camel.component.sjms.jms.SessionAcknowledgementType; -import org.apache.camel.component.sjms.jms.SessionPool; import org.apache.camel.component.sjms.producer.InOnlyProducer; import org.apache.camel.component.sjms.producer.InOutProducer; import org.apache.camel.impl.DefaultEndpoint; @@ -42,7 +41,7 @@ import org.slf4j.LoggerFactory; public class SjmsEndpoint extends DefaultEndpoint implements MultipleConsumersSupport { protected final Logger logger = LoggerFactory.getLogger(getClass()); - private SessionPool sessions; + @UriParam private boolean synchronous = true; @UriParam @@ -95,28 +94,10 @@ public class SjmsEndpoint extends DefaultEndpoint implements MultipleConsumersSu @Override protected void doStart() throws Exception { super.doStart(); - - // - // TODO since we only need a session pool for one use case, find a - // better way - // - // We only create a session pool when we are not transacted. - // Transacted listeners or producers need to be paired with the - // Session that created them. - if (!isTransacted() && getExchangePattern().equals(ExchangePattern.InOnly)) { - sessions = new SessionPool(getSessionCount(), getConnectionResource()); - - // TODO fix the string hack - sessions.setAcknowledgeMode(SessionAcknowledgementType.valueOf(getAcknowledgementMode() + "")); - getSessions().fillPool(); - } } @Override protected void doStop() throws Exception { - if (getSessions() != null) { - getSessions().drainPool(); - } super.doStop(); } @@ -169,25 +150,6 @@ public class SjmsEndpoint extends DefaultEndpoint implements MultipleConsumersSu } /** - * Returns a SessionPool if available. - * - * @return the sessions - */ - public SessionPool getSessions() { - return sessions; - } - - /** - * SessionPool used by endpoints that do not require a dedicated session per - * consumer or producer. - * - * @param sessions default null - */ - public void setSessions(SessionPool sessions) { - this.sessions = sessions; - } - - /** * Use to determine whether or not to process exchanges synchronously. * * @return true if endoint is synchronous, otherwise false @@ -239,6 +201,7 @@ public class SjmsEndpoint extends DefaultEndpoint implements MultipleConsumersSu * * @return the sessionCount */ + @Deprecated public int getSessionCount() { return sessionCount; } @@ -250,6 +213,7 @@ public class SjmsEndpoint extends DefaultEndpoint implements MultipleConsumersSu * * @param sessionCount the number of Session instances, default is 1 */ + @Deprecated public void setSessionCount(int sessionCount) { this.sessionCount = sessionCount; } http://git-wip-us.apache.org/repos/asf/camel/blob/520e55fa/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java index d4e1a1d..c5d9c1f 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java @@ -157,7 +157,7 @@ public abstract class SjmsProducer extends DefaultAsyncProducer { if (log.isDebugEnabled()) { log.debug("Processing Exchange.id:{}", exchange.getExchangeId()); } - + try { if (!isSynchronous()) { if (log.isDebugEnabled()) { @@ -189,10 +189,9 @@ public abstract class SjmsProducer extends DefaultAsyncProducer { exchange.setException(e); } log.debug("Processing Exchange.id:{}", exchange.getExchangeId() + " - SUCCESS"); - + return isSynchronous(); } - protected SjmsEndpoint getSjmsEndpoint() { return (SjmsEndpoint)this.getEndpoint(); http://git-wip-us.apache.org/repos/asf/camel/blob/520e55fa/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java index ca49765..df689b2 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java @@ -51,32 +51,44 @@ public class InOnlyProducer extends SjmsProducer { */ @Override public MessageProducerResources doCreateProducerModel() throws Exception { - Connection conn = getConnectionResource().borrowConnection(); - TransactionCommitStrategy commitStrategy = null; - Session session = null; - if (isEndpointTransacted()) { - if (getCommitStrategy() != null) { - commitStrategy = getCommitStrategy(); + MessageProducerResources answer = null; + Connection conn = null; + try { + conn = getConnectionResource().borrowConnection(); + + TransactionCommitStrategy commitStrategy = null; + Session session = null; + MessageProducer messageProducer = null; + + if (isEndpointTransacted()) { + if (getCommitStrategy() != null) { + commitStrategy = getCommitStrategy(); + } else { + commitStrategy = new DefaultTransactionCommitStrategy(); + } + session = conn.createSession(true, getAcknowledgeMode()); } else { - commitStrategy = new DefaultTransactionCommitStrategy(); + session = conn.createSession(false, getAcknowledgeMode()); + } + if (isTopic()) { + messageProducer = JmsObjectFactory.createMessageProducer(session, getDestinationName(), isTopic(), isPersistent(), getTtl()); + } else { + messageProducer = JmsObjectFactory.createQueueProducer(session, getDestinationName()); + } + answer = new MessageProducerResources(session, messageProducer, commitStrategy); + } catch (Exception e) { + log.error("Unable to create the MessageProducer: " + e.getLocalizedMessage()); + } finally { + if (conn != null) { + getConnectionResource().returnConnection(conn); } - session = conn.createSession(true, getAcknowledgeMode()); - } else { - session = conn.createSession(false, getAcknowledgeMode()); - } - MessageProducer messageProducer = null; - if (isTopic()) { - messageProducer = JmsObjectFactory.createMessageProducer(session, getDestinationName(), isTopic(), isPersistent(), getTtl()); - } else { - messageProducer = JmsObjectFactory.createQueueProducer(session, getDestinationName()); } - getConnectionResource().returnConnection(conn); - return new MessageProducerResources(session, messageProducer, commitStrategy); + return answer; } /* - * @see org.apache.camel.component.sjms.SjmsProducer#sendMessage(org.apache.camel.Exchange, org.apache.camel.AsyncCallback) - * + * @see + * org.apache.camel.component.sjms.SjmsProducer#sendMessage(org.apache.camel.Exchange, org.apache.camel.AsyncCallback) * @param exchange * @param callback * @throws Exception @@ -85,34 +97,40 @@ public class InOnlyProducer extends SjmsProducer { public void sendMessage(Exchange exchange, AsyncCallback callback) throws Exception { List<Message> messages = new ArrayList<Message>(); MessageProducerResources producer = getProducers().borrowObject(); - if (getProducers() != null) { - if (exchange.getIn().getBody() != null) { - if (exchange.getIn().getBody() instanceof List) { - List<?> payload = (List<?>)exchange.getIn().getBody(); - for (Object object : payload) { - Message message = null; - if (BatchMessage.class.isInstance(object)) { - BatchMessage<?> batchMessage = (BatchMessage<?>)object; - message = JmsMessageHelper.createMessage(producer.getSession(), batchMessage.getPayload(), batchMessage.getHeaders(), getSjmsEndpoint() - .getJmsKeyFormatStrategy()); - } else { - message = JmsMessageHelper.createMessage(producer.getSession(), object, exchange.getIn().getHeaders(), getSjmsEndpoint().getJmsKeyFormatStrategy()); + try { + if (getProducers() != null) { + if (exchange.getIn().getBody() != null) { + if (exchange.getIn().getBody() instanceof List) { + List<?> payload = (List<?>)exchange.getIn().getBody(); + for (Object object : payload) { + Message message = null; + if (BatchMessage.class.isInstance(object)) { + BatchMessage<?> batchMessage = (BatchMessage<?>)object; + message = JmsMessageHelper.createMessage(producer.getSession(), batchMessage.getPayload(), batchMessage.getHeaders(), getSjmsEndpoint() + .getJmsKeyFormatStrategy()); + } else { + message = JmsMessageHelper.createMessage(producer.getSession(), object, exchange.getIn().getHeaders(), getSjmsEndpoint().getJmsKeyFormatStrategy()); + } + messages.add(message); } + } else { + Object payload = exchange.getIn().getBody(); + Message message = JmsMessageHelper + .createMessage(producer.getSession(), payload, exchange.getIn().getHeaders(), getSjmsEndpoint().getJmsKeyFormatStrategy()); messages.add(message); } - } else { - Object payload = exchange.getIn().getBody(); - Message message = JmsMessageHelper.createMessage(producer.getSession(), payload, exchange.getIn().getHeaders(), getSjmsEndpoint().getJmsKeyFormatStrategy()); - messages.add(message); } - } - if (isEndpointTransacted()) { - exchange.getUnitOfWork().addSynchronization(new SessionTransactionSynchronization(producer.getSession(), producer.getCommitStrategy())); - } - for (Message message : messages) { - producer.getMessageProducer().send(message); + if (isEndpointTransacted()) { + exchange.getUnitOfWork().addSynchronization(new SessionTransactionSynchronization(producer.getSession(), producer.getCommitStrategy())); + } + for (Message message : messages) { + producer.getMessageProducer().send(message); + } } + } catch (Exception e) { + exchange.setException(new Exception("Unable to complet sending the message: " + e.getLocalizedMessage())); + } finally { getProducers().returnObject(producer); callback.done(isSynchronous()); } http://git-wip-us.apache.org/repos/asf/camel/blob/520e55fa/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java index c98e136..0936ecf 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java @@ -79,40 +79,49 @@ public class InOutProducer extends SjmsProducer { @Override protected MessageConsumerResource createObject() throws Exception { - Connection conn = getConnectionResource().borrowConnection(); + MessageConsumerResource answer = null; + Connection conn = null; Session session = null; - if (isEndpointTransacted()) { - session = conn.createSession(true, Session.SESSION_TRANSACTED); - } else { - session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); - } - getConnectionResource().returnConnection(conn); - Destination replyToDestination = null; - if (ObjectHelper.isEmpty(getNamedReplyTo())) { - replyToDestination = JmsObjectFactory.createTemporaryDestination(session, isTopic()); - } else { - replyToDestination = JmsObjectFactory.createDestination(session, getNamedReplyTo(), isTopic()); - } - MessageConsumer messageConsumer = JmsObjectFactory.createMessageConsumer(session, replyToDestination, null, isTopic(), null, true); - messageConsumer.setMessageListener(new MessageListener() { - - @Override - public void onMessage(Message message) { - if (logger.isDebugEnabled()) { - logger.debug("Message Received in the Consumer Pool"); - logger.debug(" Message : {}", message); - } - try { - Exchanger<Object> exchanger = exchangerMap.get(message.getJMSCorrelationID()); - exchanger.exchange(message, getResponseTimeOut(), TimeUnit.MILLISECONDS); - } catch (Exception e) { - ObjectHelper.wrapRuntimeCamelException(e); - } + try { + conn = getConnectionResource().borrowConnection(); + if (isEndpointTransacted()) { + session = conn.createSession(true, Session.SESSION_TRANSACTED); + } else { + session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + } + Destination replyToDestination = null; + if (ObjectHelper.isEmpty(getNamedReplyTo())) { + replyToDestination = JmsObjectFactory.createTemporaryDestination(session, isTopic()); + } else { + replyToDestination = JmsObjectFactory.createDestination(session, getNamedReplyTo(), isTopic()); } - }); - MessageConsumerResource mcm = new MessageConsumerResource(session, messageConsumer, replyToDestination); - return mcm; + MessageConsumer messageConsumer = JmsObjectFactory.createMessageConsumer(session, replyToDestination, null, isTopic(), null, true); + messageConsumer.setMessageListener(new MessageListener() { + + @Override + public void onMessage(Message message) { + if (logger.isDebugEnabled()) { + logger.debug("Message Received in the Consumer Pool"); + logger.debug(" Message : {}", message); + } + try { + Exchanger<Object> exchanger = exchangerMap.get(message.getJMSCorrelationID()); + exchanger.exchange(message, getResponseTimeOut(), TimeUnit.MILLISECONDS); + } catch (Exception e) { + ObjectHelper.wrapRuntimeCamelException(e); + } + + } + }); + answer = new MessageConsumerResource(session, messageConsumer, replyToDestination); + } catch (Exception e) { + log.error("Unable to create the MessageConsumerResource: " + e.getLocalizedMessage()); + throw new CamelException(e); + } finally { + getConnectionResource().returnConnection(conn); + } + return answer; } @Override @@ -227,21 +236,42 @@ public class InOutProducer extends SjmsProducer { @Override public MessageProducerResources doCreateProducerModel() throws Exception { - Connection conn = getConnectionResource().borrowConnection(); - Session session = null; - if (isEndpointTransacted()) { - session = conn.createSession(true, getAcknowledgeMode()); - } else { - session = conn.createSession(false, getAcknowledgeMode()); - } - MessageProducer messageProducer = null; - if (isTopic()) { - messageProducer = JmsObjectFactory.createMessageProducer(session, getDestinationName(), isTopic(), isPersistent(), getTtl()); - } else { - messageProducer = JmsObjectFactory.createQueueProducer(session, getDestinationName()); + MessageProducerResources answer = null; + Connection conn = null; + try { + MessageProducer messageProducer = null; + Session session = null; + + conn = getConnectionResource().borrowConnection(); + if (isEndpointTransacted()) { + session = conn.createSession(true, getAcknowledgeMode()); + } else { + session = conn.createSession(false, getAcknowledgeMode()); + } + if (isTopic()) { + messageProducer = JmsObjectFactory.createMessageProducer(session, getDestinationName(), isTopic(), isPersistent(), getTtl()); + } else { + messageProducer = JmsObjectFactory.createQueueProducer(session, getDestinationName()); + } + + if (session == null) { + throw new CamelException("Message Consumer Creation Exception: Session is NULL"); + } + if (messageProducer == null) { + throw new CamelException("Message Consumer Creation Exception: MessageProducer is NULL"); + } + + answer = new MessageProducerResources(session, messageProducer); + + } catch (Exception e) { + log.error("Unable to create the MessageProducer: " + e.getLocalizedMessage()); + } finally { + if (conn != null) { + getConnectionResource().returnConnection(conn); + } } - getConnectionResource().returnConnection(conn); - return new MessageProducerResources(session, messageProducer); + + return answer; } /** @@ -269,9 +299,9 @@ public class InOutProducer extends SjmsProducer { if (isEndpointTransacted()) { exchange.getUnitOfWork().addSynchronization(new SessionTransactionSynchronization(producer.getSession(), getCommitStrategy())); } - + Message request = SjmsExchangeMessageHelper.createMessage(exchange, producer.getSession(), getSjmsEndpoint().getJmsKeyFormatStrategy()); - + // TODO just set the correlation id don't get it from the // message String correlationId = null; @@ -295,7 +325,8 @@ public class InOutProducer extends SjmsProducer { consumers.returnObject(consumer); producer.getMessageProducer().send(request); - // Return the producer to the pool so another waiting producer can move forward + // Return the producer to the pool so another waiting producer + // can move forward // without waiting on us to complete the exchange try { getProducers().returnObject(producer); http://git-wip-us.apache.org/repos/asf/camel/blob/520e55fa/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyTopicDurableConsumerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyTopicDurableConsumerTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyTopicDurableConsumerTest.java index 9fa0a80..2420476 100644 --- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyTopicDurableConsumerTest.java +++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyTopicDurableConsumerTest.java @@ -78,10 +78,10 @@ public class InOnlyTopicDurableConsumerTest extends CamelTestSupport { return new RouteBuilder() { @Override public void configure() throws Exception { - from("sjms:topic:foo?durableSubscriptionId=bar") + from("sjms:topic:foo?durableSubscriptionId=bar1") .to("mock:result"); - from("sjms:topic:foo?durableSubscriptionId=bar") + from("sjms:topic:foo?durableSubscriptionId=bar2") .to("mock:result2"); } };