CAMEL-7908 Add a DestinationCreationStrategy to the SJMS component with thanks to Aaron
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/cb784c84 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/cb784c84 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/cb784c84 Branch: refs/heads/master Commit: cb784c8495bcdb85150852c827c8fbc8b500be2f Parents: eb4300e Author: Willem Jiang <willem.ji...@gmail.com> Authored: Fri Nov 7 10:33:42 2014 +0800 Committer: Willem Jiang <willem.ji...@gmail.com> Committed: Fri Nov 7 10:33:42 2014 +0800 ---------------------------------------------------------------------- .../camel/component/sjms/SjmsComponent.java | 13 +++ .../camel/component/sjms/SjmsConsumer.java | 32 ++---- .../camel/component/sjms/SjmsEndpoint.java | 11 ++ .../sjms/SjmsExchangeMessageHelper.java | 1 + .../camel/component/sjms/SjmsProducer.java | 2 +- .../sjms/consumer/AbstractMessageHandler.java | 10 +- .../sjms/consumer/InOnlyMessageHandler.java | 5 +- .../sjms/consumer/InOutMessageHandler.java | 9 +- .../jms/DefaultDestinationCreationStrategy.java | 43 ++++++++ .../sjms/jms/DestinationCreationStrategy.java | 13 +++ .../component/sjms/jms/JmsObjectFactory.java | 108 ++----------------- .../component/sjms/producer/InOnlyProducer.java | 28 ++--- .../component/sjms/producer/InOutProducer.java | 76 +++---------- .../SjmsDestinationCreationStrategyTest.java | 79 ++++++++++++++ .../DefaultDestinationCreationStrategyTest.java | 53 +++++++++ .../sjms/producer/InOnlyQueueProducerTest.java | 3 +- .../sjms/producer/InOnlyTopicProducerTest.java | 3 +- .../InOutQueueProducerAsyncLoadTest.java | 5 +- .../InOutQueueProducerSyncLoadTest.java | 5 +- .../sjms/producer/InOutQueueProducerTest.java | 7 +- .../producer/InOutTempQueueProducerTest.java | 5 +- .../sjms/producer/QueueProducerTest.java | 3 +- .../component/sjms/support/JmsTestSupport.java | 18 +++- 23 files changed, 295 insertions(+), 237 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/cb784c84/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java index fe79846..930dc0c 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java @@ -27,6 +27,7 @@ import org.apache.camel.ExchangePattern; import org.apache.camel.component.sjms.jms.ConnectionFactoryResource; import org.apache.camel.component.sjms.jms.ConnectionResource; import org.apache.camel.component.sjms.jms.DefaultJmsKeyFormatStrategy; +import org.apache.camel.component.sjms.jms.DestinationCreationStrategy; import org.apache.camel.component.sjms.jms.KeyFormatStrategy; import org.apache.camel.component.sjms.taskmanager.TimedTaskManager; import org.apache.camel.impl.UriEndpointComponent; @@ -49,6 +50,7 @@ public class SjmsComponent extends UriEndpointComponent implements HeaderFilterS private Integer connectionCount = 1; private TransactionCommitStrategy transactionCommitStrategy; private TimedTaskManager timedTaskManager; + private DestinationCreationStrategy destinationCreationStrategy; private ExecutorService asyncStartStopExecutorService; public SjmsComponent() { @@ -67,6 +69,9 @@ public class SjmsComponent extends UriEndpointComponent implements HeaderFilterS if (transactionCommitStrategy != null) { endpoint.setTransactionCommitStrategy(transactionCommitStrategy); } + if (destinationCreationStrategy != null) { + endpoint.setDestinationCreationStrategy(destinationCreationStrategy); + } return endpoint; } @@ -244,6 +249,14 @@ public class SjmsComponent extends UriEndpointComponent implements HeaderFilterS this.transactionCommitStrategy = commitStrategy; } + public DestinationCreationStrategy getDestinationCreationStrategy() { + return destinationCreationStrategy; + } + + public void setDestinationCreationStrategy(DestinationCreationStrategy destinationCreationStrategy) { + this.destinationCreationStrategy = destinationCreationStrategy; + } + public TimedTaskManager getTimedTaskManager() { return timedTaskManager; } http://git-wip-us.apache.org/repos/asf/camel/blob/cb784c84/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java index d93cf82..f1959b7 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java @@ -19,11 +19,11 @@ package org.apache.camel.component.sjms; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import javax.jms.Connection; +import javax.jms.Destination; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; -import org.apache.camel.CamelException; import org.apache.camel.Endpoint; import org.apache.camel.ExchangePattern; import org.apache.camel.Processor; @@ -179,35 +179,21 @@ public class SjmsConsumer extends DefaultConsumer { */ @SuppressWarnings("unused") private MessageConsumerResources createConsumer() throws Exception { - MessageConsumerResources answer = null; - Connection conn = null; + MessageConsumerResources answer; + Connection conn = getConnectionResource().borrowConnection(); try { - conn = getConnectionResource().borrowConnection(); - - Session session = null; - MessageConsumer messageConsumer = null; - if (isTransacted()) { - session = conn.createSession(true, Session.SESSION_TRANSACTED); - } else { - session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); - } - messageConsumer = JmsObjectFactory.createMessageConsumer(session, getDestinationName(), getMessageSelector(), isTopic(), getDurableSubscriptionId()); + Session session = conn.createSession(isTransacted(), isTransacted() ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE); + Destination destination = getEndpoint().getDestinationCreationStrategy().createDestination(session, getDestinationName(), isTopic()); + MessageConsumer messageConsumer = JmsObjectFactory.createMessageConsumer(session, destination, getMessageSelector(), isTopic(), getDurableSubscriptionId()); MessageListener handler = createMessageHandler(session); messageConsumer.setMessageListener(handler); - if (session == null) { - throw new CamelException("Message Consumer Creation Exception: Session is NULL"); - } - if (messageConsumer == null) { - throw new CamelException("Message Consumer Creation Exception: MessageConsumer is NULL"); - } answer = new MessageConsumerResources(session, messageConsumer); } catch (Exception e) { - log.error("Unable to create the MessageConsumer: " + e.getLocalizedMessage()); + log.error("Unable to create the MessageConsumer", e); + throw e; } finally { - if (conn != null) { - getConnectionResource().returnConnection(conn); - } + getConnectionResource().returnConnection(conn); } return answer; } http://git-wip-us.apache.org/repos/asf/camel/blob/cb784c84/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java index 61d4bea..60bff4e 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java @@ -23,6 +23,8 @@ import org.apache.camel.MultipleConsumersSupport; import org.apache.camel.Processor; import org.apache.camel.Producer; import org.apache.camel.component.sjms.jms.ConnectionResource; +import org.apache.camel.component.sjms.jms.DefaultDestinationCreationStrategy; +import org.apache.camel.component.sjms.jms.DestinationCreationStrategy; import org.apache.camel.component.sjms.jms.KeyFormatStrategy; import org.apache.camel.component.sjms.jms.SessionAcknowledgementType; import org.apache.camel.component.sjms.producer.InOnlyProducer; @@ -77,6 +79,8 @@ public class SjmsEndpoint extends DefaultEndpoint implements MultipleConsumersSu @UriParam private boolean prefillPool = true; private TransactionCommitStrategy transactionCommitStrategy; +// @UriParam + private DestinationCreationStrategy destinationCreationStrategy = new DefaultDestinationCreationStrategy(); public SjmsEndpoint() { } @@ -479,4 +483,11 @@ public class SjmsEndpoint extends DefaultEndpoint implements MultipleConsumersSu this.prefillPool = prefillPool; } + public DestinationCreationStrategy getDestinationCreationStrategy() { + return destinationCreationStrategy; + } + + public void setDestinationCreationStrategy(DestinationCreationStrategy destinationCreationStrategy) { + this.destinationCreationStrategy = destinationCreationStrategy; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/cb784c84/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsExchangeMessageHelper.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsExchangeMessageHelper.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsExchangeMessageHelper.java index 6efbb9f..efe13c7 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsExchangeMessageHelper.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsExchangeMessageHelper.java @@ -50,6 +50,7 @@ import org.apache.camel.util.ExchangeHelper; import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import static org.apache.camel.component.sjms.SjmsConstants.JMS_MESSAGE_TYPE; import static org.apache.camel.component.sjms.SjmsConstants.QUEUE_PREFIX; import static org.apache.camel.component.sjms.SjmsConstants.TOPIC_PREFIX; http://git-wip-us.apache.org/repos/asf/camel/blob/cb784c84/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java index 14795e9..f26cb2a 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java @@ -129,7 +129,7 @@ public abstract class SjmsProducer extends DefaultAsyncProducer { getProducers().close(); setProducers(null); } catch (Throwable e) { - log.warn("Error stopping listener container on destination: " + getDestinationName() + ". This exception will be ignored.", e); + log.warn("Error closing producers on destination: " + getDestinationName() + ". This exception will be ignored.", e); } } http://git-wip-us.apache.org/repos/asf/camel/blob/cb784c84/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/AbstractMessageHandler.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/AbstractMessageHandler.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/AbstractMessageHandler.java index 67ce398..763b2a7 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/AbstractMessageHandler.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/AbstractMessageHandler.java @@ -22,9 +22,9 @@ import javax.jms.MessageListener; import javax.jms.Session; import org.apache.camel.AsyncProcessor; -import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.RuntimeCamelException; +import org.apache.camel.component.sjms.SjmsEndpoint; import org.apache.camel.component.sjms.SjmsExchangeMessageHelper; import org.apache.camel.component.sjms.TransactionCommitStrategy; import org.apache.camel.impl.DefaultExchange; @@ -43,7 +43,7 @@ public abstract class AbstractMessageHandler implements MessageListener { private final ExecutorService executor; - private Endpoint endpoint; + private SjmsEndpoint endpoint; private AsyncProcessor processor; private Session session; private boolean transacted; @@ -52,12 +52,12 @@ public abstract class AbstractMessageHandler implements MessageListener { private boolean topic; private TransactionCommitStrategy commitStrategy; - public AbstractMessageHandler(Endpoint endpoint, ExecutorService executor) { + public AbstractMessageHandler(SjmsEndpoint endpoint, ExecutorService executor) { this.endpoint = endpoint; this.executor = executor; } - public AbstractMessageHandler(Endpoint endpoint, ExecutorService executor, Synchronization synchronization) { + public AbstractMessageHandler(SjmsEndpoint endpoint, ExecutorService executor, Synchronization synchronization) { this.synchronization = synchronization; this.endpoint = endpoint; this.executor = executor; @@ -134,7 +134,7 @@ public abstract class AbstractMessageHandler implements MessageListener { return transacted; } - public Endpoint getEndpoint() { + public SjmsEndpoint getEndpoint() { return endpoint; } http://git-wip-us.apache.org/repos/asf/camel/blob/cb784c84/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOnlyMessageHandler.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOnlyMessageHandler.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOnlyMessageHandler.java index aa21578..e23844e 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOnlyMessageHandler.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOnlyMessageHandler.java @@ -21,6 +21,7 @@ import java.util.concurrent.ExecutorService; import org.apache.camel.AsyncCallback; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; +import org.apache.camel.component.sjms.SjmsEndpoint; import org.apache.camel.spi.Synchronization; /** @@ -32,7 +33,7 @@ public class InOnlyMessageHandler extends AbstractMessageHandler { * @param endpoint * @param executor */ - public InOnlyMessageHandler(Endpoint endpoint, ExecutorService executor) { + public InOnlyMessageHandler(SjmsEndpoint endpoint, ExecutorService executor) { super(endpoint, executor); } @@ -41,7 +42,7 @@ public class InOnlyMessageHandler extends AbstractMessageHandler { * @param executor * @param synchronization */ - public InOnlyMessageHandler(Endpoint endpoint, ExecutorService executor, Synchronization synchronization) { + public InOnlyMessageHandler(SjmsEndpoint endpoint, ExecutorService executor, Synchronization synchronization) { super(endpoint, executor, synchronization); } http://git-wip-us.apache.org/repos/asf/camel/blob/cb784c84/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOutMessageHandler.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOutMessageHandler.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOutMessageHandler.java index a9a6a41..15d39d2 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOutMessageHandler.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOutMessageHandler.java @@ -34,7 +34,6 @@ import org.apache.camel.Exchange; import org.apache.camel.component.sjms.SjmsEndpoint; import org.apache.camel.component.sjms.SjmsExchangeMessageHelper; import org.apache.camel.component.sjms.jms.JmsMessageHelper; -import org.apache.camel.component.sjms.jms.JmsObjectFactory; import org.apache.camel.spi.Synchronization; import org.apache.camel.util.ObjectHelper; @@ -47,11 +46,11 @@ public class InOutMessageHandler extends AbstractMessageHandler { private Map<String, MessageProducer> producerCache = new TreeMap<String, MessageProducer>(); private ReadWriteLock lock = new ReentrantReadWriteLock(); - public InOutMessageHandler(Endpoint endpoint, ExecutorService executor) { + public InOutMessageHandler(SjmsEndpoint endpoint, ExecutorService executor) { super(endpoint, executor); } - public InOutMessageHandler(Endpoint endpoint, ExecutorService executor, Synchronization synchronization) { + public InOutMessageHandler(SjmsEndpoint endpoint, ExecutorService executor, Synchronization synchronization) { super(endpoint, executor, synchronization); } @@ -61,11 +60,11 @@ public class InOutMessageHandler extends AbstractMessageHandler { MessageProducer messageProducer = null; Object obj = exchange.getIn().getHeader(JmsMessageHelper.JMS_REPLY_TO); if (obj != null) { - Destination replyTo = null; + Destination replyTo; if (isDestination(obj)) { replyTo = (Destination) obj; } else if (obj instanceof String) { - replyTo = JmsObjectFactory.createDestination(getSession(), (String) obj, isTopic()); + replyTo = getEndpoint().getDestinationCreationStrategy().createDestination(getSession(), (String)obj, isTopic()); } else { throw new Exception("The value of JMSReplyTo must be a valid Destination or String. Value provided: " + obj); } http://git-wip-us.apache.org/repos/asf/camel/blob/cb784c84/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/DefaultDestinationCreationStrategy.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/DefaultDestinationCreationStrategy.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/DefaultDestinationCreationStrategy.java new file mode 100644 index 0000000..267333d --- /dev/null +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/DefaultDestinationCreationStrategy.java @@ -0,0 +1,43 @@ +package org.apache.camel.component.sjms.jms; + +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Session; + +/** + * Default implementation of DestinationCreationStrategy, delegates to Session.createTopic + * and Session.createQueue. + * + * @see org.apache.camel.component.sjms.jms.DestinationCreationStrategy + * @see javax.jms.Session + */ +public class DefaultDestinationCreationStrategy implements DestinationCreationStrategy { + private static final String TOPIC_PREFIX = "topic://"; + private static final String QUEUE_PREFIX = "queue://"; + + @Override + public Destination createDestination(final Session session, String name, final boolean topic) throws JMSException { + Destination destination; + if (topic) { + if (name.startsWith(TOPIC_PREFIX)) { + name = name.substring(TOPIC_PREFIX.length()); + } + destination = session.createTopic(name); + } else { + if (name.startsWith(QUEUE_PREFIX)) { + name = name.substring(QUEUE_PREFIX.length()); + } + destination = session.createQueue(name); + } + return destination; + } + + @Override + public Destination createTemporaryDestination(final Session session, final boolean topic) throws JMSException { + if (topic) { + return session.createTemporaryTopic(); + } else { + return session.createTemporaryQueue(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/cb784c84/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/DestinationCreationStrategy.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/DestinationCreationStrategy.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/DestinationCreationStrategy.java new file mode 100644 index 0000000..a7ce3ee --- /dev/null +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/DestinationCreationStrategy.java @@ -0,0 +1,13 @@ +package org.apache.camel.component.sjms.jms; + +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Session; + +/** + * Strategy for creating Destination's + */ +public interface DestinationCreationStrategy { + Destination createDestination(Session session, String name, boolean topic) throws JMSException; + Destination createTemporaryDestination(Session session, boolean topic) throws JMSException; +} http://git-wip-us.apache.org/repos/asf/camel/blob/cb784c84/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsObjectFactory.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsObjectFactory.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsObjectFactory.java index 1292398..c25bdfb 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsObjectFactory.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsObjectFactory.java @@ -34,77 +34,14 @@ public final class JmsObjectFactory { //Helper class } - public static Destination createDestination(Session session, String destinationName, boolean topic) throws Exception { - if (topic) { - return createTopic(session, destinationName); - } else { - return createQueue(session, destinationName); - } - } - - public static Destination createQueue(Session session, String destinationName) throws Exception { - return session.createQueue(destinationName); - } - - public static Destination createTemporaryDestination(Session session, boolean topic) throws Exception { - if (topic) { - return session.createTemporaryTopic(); - } else { - return session.createTemporaryQueue(); - } - } - - public static Destination createTopic(Session session, String destinationName) throws Exception { - return session.createTopic(destinationName); - } - - public static MessageConsumer createQueueConsumer(Session session, String destinationName) throws Exception { - return createMessageConsumer(session, destinationName, null, false, null); - } - - public static MessageConsumer createQueueConsumer(Session session, String destinationName, String messageSelector) throws Exception { - return createMessageConsumer(session, destinationName, messageSelector, false, null); - } - - public static MessageConsumer createTopicConsumer(Session session, String destinationName, String messageSelector) throws Exception { - return createMessageConsumer(session, destinationName, messageSelector, true, null); - } - - public static MessageConsumer createTemporaryMessageConsumer( - Session session, - String messageSelector, - boolean topic, - String durableSubscriptionId, - boolean noLocal) throws Exception { - Destination destination = createTemporaryDestination(session, topic); - return createMessageConsumer(session, destination, messageSelector, topic, durableSubscriptionId, noLocal); - } - public static MessageConsumer createMessageConsumer( Session session, - String destinationName, + Destination destination, String messageSelector, boolean topic, String durableSubscriptionId) throws Exception { // noLocal is default false accordingly to JMS spec - return createMessageConsumer(session, destinationName, messageSelector, topic, durableSubscriptionId, false); - } - - public static MessageConsumer createMessageConsumer( - Session session, - String destinationName, - String messageSelector, - boolean topic, - String durableSubscriptionId, - boolean noLocal) throws Exception { - Destination destination = null; - if (topic) { - destination = session.createTopic(destinationName); - - } else { - destination = session.createQueue(destinationName); - } - return createMessageConsumer(session, destination, messageSelector, topic, durableSubscriptionId, noLocal); + return createMessageConsumer(session, destination, messageSelector, topic, durableSubscriptionId, false); } public static MessageConsumer createMessageConsumer( @@ -114,7 +51,7 @@ public final class JmsObjectFactory { boolean topic, String durableSubscriptionId, boolean noLocal) throws Exception { - MessageConsumer messageConsumer = null; + MessageConsumer messageConsumer; if (topic) { if (ObjectHelper.isNotEmpty(durableSubscriptionId)) { @@ -141,44 +78,13 @@ public final class JmsObjectFactory { return messageConsumer; } - public static MessageProducer createQueueProducer( - Session session, - String destinationName) throws Exception { - return createMessageProducer(session, destinationName, false, true, -1); - } - - public static MessageProducer createTopicProducer( - Session session, - String destinationName) throws Exception { - return createMessageProducer(session, destinationName, true, false, -1); - } - public static MessageProducer createMessageProducer( Session session, - String destinationName, - boolean topic, - boolean persitent, + Destination destination, + boolean persistent, long ttl) throws Exception { - MessageProducer messageProducer = null; - Destination destination = null; - if (topic) { - if (destinationName.startsWith("topic://")) { - destinationName = destinationName.substring("topic://".length()); - } - destination = session.createTopic(destinationName); - } else { - if (destinationName.startsWith("queue://")) { - destinationName = destinationName.substring("queue://".length()); - } - destination = session.createQueue(destinationName); - } - messageProducer = session.createProducer(destination); - - if (persitent) { - messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT); - } else { - messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - } + MessageProducer messageProducer = session.createProducer(destination); + messageProducer.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); if (ttl > 0) { messageProducer.setTimeToLive(ttl); } http://git-wip-us.apache.org/repos/asf/camel/blob/cb784c84/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java index 35ef352..6ead317 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import javax.jms.Connection; +import javax.jms.Destination; import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.Session; @@ -52,33 +53,24 @@ public class InOnlyProducer extends SjmsProducer { */ @Override public MessageProducerResources doCreateProducerModel() throws Exception { - MessageProducerResources answer = null; - Connection conn = null; + MessageProducerResources answer; + Connection conn = getConnectionResource().borrowConnection(); try { - conn = getConnectionResource().borrowConnection(); TransactionCommitStrategy commitStrategy = null; - Session session; - if (isEndpointTransacted()) { - if (getCommitStrategy() != null) { - commitStrategy = getCommitStrategy(); - } else { - commitStrategy = new DefaultTransactionCommitStrategy(); - } - session = conn.createSession(true, getAcknowledgeMode()); - } else { - session = conn.createSession(false, getAcknowledgeMode()); + commitStrategy = getCommitStrategy() == null ? new DefaultTransactionCommitStrategy() : getCommitStrategy(); } - - MessageProducer messageProducer = JmsObjectFactory.createMessageProducer(session, getDestinationName(), isTopic(), isPersistent(), getTtl()); + Session session = conn.createSession(isEndpointTransacted(), getAcknowledgeMode()); + Destination destination = getEndpoint().getDestinationCreationStrategy().createDestination(session, getDestinationName(), isTopic()); + MessageProducer messageProducer = JmsObjectFactory.createMessageProducer(session, destination, isPersistent(), getTtl()); answer = new MessageProducerResources(session, messageProducer, commitStrategy); + } catch (Exception e) { log.error("Unable to create the MessageProducer", e); + throw e; } finally { - if (conn != null) { - getConnectionResource().returnConnection(conn); - } + getConnectionResource().returnConnection(conn); } return answer; } http://git-wip-us.apache.org/repos/asf/camel/blob/cb784c84/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java index ef42035..630645d 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java @@ -46,8 +46,6 @@ import org.apache.camel.component.sjms.tx.SessionTransactionSynchronization; import org.apache.camel.util.ObjectHelper; import org.apache.commons.pool.BasePoolableObjectFactory; import org.apache.commons.pool.impl.GenericObjectPool; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * A Camel Producer that provides the InOut Exchange pattern. @@ -62,29 +60,28 @@ public class InOutProducer extends SjmsProducer { private ReadWriteLock lock = new ReentrantReadWriteLock(); /** - * A pool of {@link MessageConsumerResource} objects that are the reply + * A pool of {@link MessageConsumerResources} objects that are the reply * consumers. */ protected class MessageConsumerResourcesFactory extends BasePoolableObjectFactory<MessageConsumerResources> { @Override public MessageConsumerResources makeObject() throws Exception { - MessageConsumerResources answer = null; - Connection conn = null; - Session session = null; + MessageConsumerResources answer; + Connection conn = getConnectionResource().borrowConnection(); try { - conn = getConnectionResource().borrowConnection(); + Session session; if (isEndpointTransacted()) { session = conn.createSession(true, Session.SESSION_TRANSACTED); } else { session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); } - Destination replyToDestination = null; + Destination replyToDestination; if (ObjectHelper.isEmpty(getNamedReplyTo())) { - replyToDestination = JmsObjectFactory.createTemporaryDestination(session, isTopic()); + replyToDestination = getEndpoint().getDestinationCreationStrategy().createTemporaryDestination(session, isTopic()); } else { - replyToDestination = JmsObjectFactory.createDestination(session, getNamedReplyTo(), isTopic()); + replyToDestination = getEndpoint().getDestinationCreationStrategy().createDestination(session, getNamedReplyTo(), isTopic()); } MessageConsumer messageConsumer = JmsObjectFactory.createMessageConsumer(session, replyToDestination, null, isTopic(), null, true); messageConsumer.setMessageListener(new MessageListener() { @@ -133,29 +130,6 @@ public class InOutProducer extends SjmsProducer { } } - protected class InternalTempDestinationListener implements MessageListener { - private final Logger tempLogger = LoggerFactory.getLogger(InternalTempDestinationListener.class); - private Exchanger<Object> exchanger; - - public InternalTempDestinationListener(Exchanger<Object> exchanger) { - this.exchanger = exchanger; - } - - @Override - public void onMessage(Message message) { - if (tempLogger.isDebugEnabled()) { - tempLogger.debug("Message Received in the Consumer Pool"); - tempLogger.debug(" Message : {}", message); - } - try { - exchanger.exchange(message, getResponseTimeOut(), TimeUnit.MILLISECONDS); - } catch (Exception e) { - ObjectHelper.wrapRuntimeCamelException(e); - } - - } - } - private GenericObjectPool<MessageConsumerResources> consumers; public InOutProducer(SjmsEndpoint endpoint) { @@ -192,36 +166,20 @@ public class InOutProducer extends SjmsProducer { @Override public MessageProducerResources doCreateProducerModel() throws Exception { - MessageProducerResources answer = null; - Connection conn = null; + MessageProducerResources answer; + Connection conn = getConnectionResource().borrowConnection(); try { - MessageProducer messageProducer = null; - Session session = null; - - conn = getConnectionResource().borrowConnection(); - if (isEndpointTransacted()) { - session = conn.createSession(true, getAcknowledgeMode()); - } else { - session = conn.createSession(false, getAcknowledgeMode()); - } - - messageProducer = JmsObjectFactory.createMessageProducer(session, getDestinationName(), isTopic(), isPersistent(), getTtl()); - - if (session == null) { - throw new CamelException("Message Consumer Creation Exception: Session is NULL"); - } - if (messageProducer == null) { - throw new CamelException("Message Consumer Creation Exception: MessageProducer is NULL"); - } + Session session = conn.createSession(isEndpointTransacted(), getAcknowledgeMode()); + Destination destination = getEndpoint().getDestinationCreationStrategy().createDestination(session, getDestinationName(), isTopic()); + MessageProducer messageProducer = JmsObjectFactory.createMessageProducer(session, destination, isPersistent(), getTtl()); answer = new MessageProducerResources(session, messageProducer); } catch (Exception e) { - log.error("Unable to create the MessageProducer: " + e.getLocalizedMessage()); + log.error("Unable to create the MessageProducer", e); + throw e; } finally { - if (conn != null) { - getConnectionResource().returnConnection(conn); - } + getConnectionResource().returnConnection(conn); } return answer; @@ -250,8 +208,8 @@ public class InOutProducer extends SjmsProducer { Object responseObject = null; Exchanger<Object> messageExchanger = new Exchanger<Object>(); SjmsExchangeMessageHelper.setCorrelationId(request, correlationId); + lock.writeLock().lock(); try { - lock.writeLock().lock(); exchangerMap.put(correlationId, messageExchanger); } finally { lock.writeLock().unlock(); @@ -274,8 +232,8 @@ public class InOutProducer extends SjmsProducer { try { responseObject = messageExchanger.exchange(null, getResponseTimeOut(), TimeUnit.MILLISECONDS); + lock.writeLock().lock(); try { - lock.writeLock().lock(); exchangerMap.remove(correlationId); } finally { lock.writeLock().unlock(); http://git-wip-us.apache.org/repos/asf/camel/blob/cb784c84/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/SjmsDestinationCreationStrategyTest.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/SjmsDestinationCreationStrategyTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/SjmsDestinationCreationStrategyTest.java new file mode 100644 index 0000000..1477b33 --- /dev/null +++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/SjmsDestinationCreationStrategyTest.java @@ -0,0 +1,79 @@ +package org.apache.camel.component.sjms; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.camel.CamelContext; +import org.apache.camel.Exchange; +import org.apache.camel.ExchangePattern; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.sjms.jms.DefaultDestinationCreationStrategy; +import org.apache.camel.component.sjms.support.JmsTestSupport; +import org.apache.camel.impl.DefaultCamelContext; +import org.junit.Test; + +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Session; + +/** + * @author + */ +public class SjmsDestinationCreationStrategyTest extends JmsTestSupport { + + private boolean createDestinationCalled = false; + private boolean createTemporaryDestination = false; + + @Override + protected CamelContext createCamelContext() throws Exception { + CamelContext camelContext = new DefaultCamelContext(); + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUri); + SjmsComponent component = new SjmsComponent(); + component.setConnectionFactory(connectionFactory); + component.setDestinationCreationStrategy(new TestDestinationCreationStrategyTest()); + camelContext.addComponent("sjms", component); + return camelContext; + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("sjms:queue:inout?prefillPool=false&exchangePattern=InOut").process(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.getOut().setBody("response"); + } + }); + } + }; + } + + @Test + public void testSjmsComponentUsesCustomDestinationCreationStrategy() throws Exception { + assertFalse(createDestinationCalled); + template.sendBody("sjms:queue:inonly?prefillPool=false", "hello world"); + assertTrue(createDestinationCalled); + + assertFalse(createTemporaryDestination); + String response = (String)template.sendBody("sjms:queue:inout?prefillPool=false&exchangePattern=InOut", ExchangePattern.InOut, "hello world 2" ); + assertTrue(createTemporaryDestination); + assertEquals("response", response); + } + + class TestDestinationCreationStrategyTest extends DefaultDestinationCreationStrategy { + @Override + public Destination createDestination(Session session, String name, boolean topic) throws JMSException { + if (name.equals("inonly")) { + createDestinationCalled = true; + } + return super.createDestination(session, name, topic); + } + + @Override + public Destination createTemporaryDestination(Session session, boolean topic) throws JMSException { + createTemporaryDestination = true; + return super.createTemporaryDestination(session, topic); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/cb784c84/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/jms/DefaultDestinationCreationStrategyTest.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/jms/DefaultDestinationCreationStrategyTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/jms/DefaultDestinationCreationStrategyTest.java new file mode 100644 index 0000000..159fc36 --- /dev/null +++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/jms/DefaultDestinationCreationStrategyTest.java @@ -0,0 +1,53 @@ +package org.apache.camel.component.sjms.jms; + +import org.apache.camel.component.sjms.support.JmsTestSupport; +import org.junit.Test; + +import javax.jms.Queue; +import javax.jms.TemporaryQueue; +import javax.jms.TemporaryTopic; +import javax.jms.Topic; + +/** + * @author + */ +public class DefaultDestinationCreationStrategyTest extends JmsTestSupport { + + private DestinationCreationStrategy strategy = new DefaultDestinationCreationStrategy(); + + @Test + public void testQueueCreation() throws Exception { + Queue destination = (Queue)strategy.createDestination(getSession(), "queue://test", false); + assertNotNull(destination); + assertEquals("test", destination.getQueueName()); + + destination = (Queue)strategy.createDestination(getSession(), "test", false); + assertNotNull(destination); + assertEquals("test", destination.getQueueName()); + } + + @Test + public void testTopicCreation() throws Exception { + Topic destination = (Topic)strategy.createDestination(getSession(), "topic://test", true); + assertNotNull(destination); + assertEquals("test", destination.getTopicName()); + + destination = (Topic)strategy.createDestination(getSession(), "test", true); + assertNotNull(destination); + assertEquals("test", destination.getTopicName()); + } + + @Test + public void testTemporaryQueueCreation() throws Exception { + TemporaryQueue destination = (TemporaryQueue)strategy.createTemporaryDestination(getSession(), false); + assertNotNull(destination); + assertNotNull(destination.getQueueName()); + } + + @Test + public void testTemporaryTopicCreation() throws Exception { + TemporaryTopic destination = (TemporaryTopic)strategy.createTemporaryDestination(getSession(), true); + assertNotNull(destination); + assertNotNull(destination.getTopicName()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/cb784c84/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOnlyQueueProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOnlyQueueProducerTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOnlyQueueProducerTest.java index 58fb33c..1ec59ca 100644 --- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOnlyQueueProducerTest.java +++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOnlyQueueProducerTest.java @@ -22,7 +22,6 @@ import javax.jms.TextMessage; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; -import org.apache.camel.component.sjms.jms.JmsObjectFactory; import org.apache.camel.component.sjms.support.JmsTestSupport; import org.junit.Test; @@ -41,7 +40,7 @@ public class InOnlyQueueProducerTest extends JmsTestSupport { @Test public void testInOnlyQueueProducer() throws Exception { - MessageConsumer mc = JmsObjectFactory.createQueueConsumer(getSession(), TEST_DESTINATION_NAME); + MessageConsumer mc = createQueueConsumer(TEST_DESTINATION_NAME); assertNotNull(mc); final String expectedBody = "Hello World!"; MockEndpoint mock = getMockEndpoint("mock:result"); http://git-wip-us.apache.org/repos/asf/camel/blob/cb784c84/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOnlyTopicProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOnlyTopicProducerTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOnlyTopicProducerTest.java index 459ffea..d62a88d 100644 --- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOnlyTopicProducerTest.java +++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOnlyTopicProducerTest.java @@ -22,7 +22,6 @@ import javax.jms.TextMessage; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; -import org.apache.camel.component.sjms.jms.JmsObjectFactory; import org.apache.camel.component.sjms.support.JmsTestSupport; import org.junit.Test; @@ -41,7 +40,7 @@ public class InOnlyTopicProducerTest extends JmsTestSupport { @Test public void testInOnlyTopicProducerProducer() throws Exception { - MessageConsumer mc = JmsObjectFactory.createTopicConsumer(getSession(), TEST_DESTINATION_NAME, null); + MessageConsumer mc = createTopicConsumer(TEST_DESTINATION_NAME, null); assertNotNull(mc); final String expectedBody = "Hello World!"; MockEndpoint mock = getMockEndpoint("mock:result"); http://git-wip-us.apache.org/repos/asf/camel/blob/cb784c84/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerAsyncLoadTest.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerAsyncLoadTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerAsyncLoadTest.java index 072b407..ba46d06 100644 --- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerAsyncLoadTest.java +++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerAsyncLoadTest.java @@ -27,7 +27,6 @@ import javax.jms.MessageProducer; import javax.jms.TextMessage; import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.component.sjms.jms.JmsObjectFactory; import org.apache.camel.component.sjms.support.JmsTestSupport; import org.junit.Test; @@ -49,8 +48,8 @@ public class InOutQueueProducerAsyncLoadTest extends JmsTestSupport { @Override public void setUp() throws Exception { super.setUp(); - mc1 = JmsObjectFactory.createQueueConsumer(getSession(), TEST_DESTINATION_NAME + ".request"); - mc2 = JmsObjectFactory.createQueueConsumer(getSession(), TEST_DESTINATION_NAME + ".request"); + mc1 = createQueueConsumer(TEST_DESTINATION_NAME + ".request"); + mc2 = createQueueConsumer(TEST_DESTINATION_NAME + ".request"); mc1.setMessageListener(new MyMessageListener()); mc2.setMessageListener(new MyMessageListener()); } http://git-wip-us.apache.org/repos/asf/camel/blob/cb784c84/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerSyncLoadTest.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerSyncLoadTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerSyncLoadTest.java index f3033df..42ba4fd 100644 --- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerSyncLoadTest.java +++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerSyncLoadTest.java @@ -27,7 +27,6 @@ import javax.jms.MessageProducer; import javax.jms.TextMessage; import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.component.sjms.jms.JmsObjectFactory; import org.apache.camel.component.sjms.support.JmsTestSupport; import org.junit.Test; @@ -49,8 +48,8 @@ public class InOutQueueProducerSyncLoadTest extends JmsTestSupport { @Override public void setUp() throws Exception { super.setUp(); - mc1 = JmsObjectFactory.createQueueConsumer(getSession(), TEST_DESTINATION_NAME + ".request"); - mc2 = JmsObjectFactory.createQueueConsumer(getSession(), TEST_DESTINATION_NAME + ".request"); + mc1 = createQueueConsumer(TEST_DESTINATION_NAME + ".request"); + mc2 = createQueueConsumer(TEST_DESTINATION_NAME + ".request"); mc1.setMessageListener(new MyMessageListener()); mc2.setMessageListener(new MyMessageListener()); } http://git-wip-us.apache.org/repos/asf/camel/blob/cb784c84/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerTest.java index 5488c94..5e04371 100644 --- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerTest.java +++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerTest.java @@ -28,7 +28,6 @@ import javax.jms.TextMessage; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.component.sjms.jms.JmsObjectFactory; import org.apache.camel.component.sjms.support.JmsTestSupport; import org.junit.Test; @@ -47,8 +46,7 @@ public class InOutQueueProducerTest extends JmsTestSupport { @Test public void testInOutQueueProducer() throws Exception { - MessageConsumer mc = JmsObjectFactory.createQueueConsumer(getSession(), TEST_DESTINATION_NAME - + ".request"); + MessageConsumer mc = createQueueConsumer(TEST_DESTINATION_NAME + ".request"); assertNotNull(mc); final String requestText = "Hello World!"; final String responseText = "How are you"; @@ -63,8 +61,7 @@ public class InOutQueueProducerTest extends JmsTestSupport { @Test public void testInOutQueueProducerWithCorrelationId() throws Exception { - MessageConsumer mc = JmsObjectFactory.createQueueConsumer(getSession(), TEST_DESTINATION_NAME - + ".request"); + MessageConsumer mc = createQueueConsumer(TEST_DESTINATION_NAME + ".request"); assertNotNull(mc); final String requestText = "Hello World!"; final String responseText = "How are you"; http://git-wip-us.apache.org/repos/asf/camel/blob/cb784c84/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutTempQueueProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutTempQueueProducerTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutTempQueueProducerTest.java index 5970e54..f4f6a4d 100644 --- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutTempQueueProducerTest.java +++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutTempQueueProducerTest.java @@ -27,7 +27,6 @@ import javax.jms.TextMessage; import org.apache.camel.Exchange; import org.apache.camel.Processor; -import org.apache.camel.component.sjms.jms.JmsObjectFactory; import org.apache.camel.component.sjms.support.JmsTestSupport; import org.junit.Test; @@ -42,7 +41,7 @@ public class InOutTempQueueProducerTest extends JmsTestSupport { @Test public void testInOutQueueProducer() throws Exception { String queueName = "in.out.queue.producer.test.request"; - MessageConsumer mc = JmsObjectFactory.createQueueConsumer(getSession(), queueName); + MessageConsumer mc = createQueueConsumer(queueName); assertNotNull(mc); final String requestText = "Hello World!"; final String responseText = "How are you"; @@ -58,7 +57,7 @@ public class InOutTempQueueProducerTest extends JmsTestSupport { @Test public void testInOutQueueProducerWithCorrelationId() throws Exception { String queueName = "in.out.queue.producer.test.request"; - MessageConsumer mc = JmsObjectFactory.createQueueConsumer(getSession(), queueName); + MessageConsumer mc = createQueueConsumer(queueName); assertNotNull(mc); final String requestText = "Hello World!"; final String responseText = "How are you"; http://git-wip-us.apache.org/repos/asf/camel/blob/cb784c84/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/QueueProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/QueueProducerTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/QueueProducerTest.java index 07063d1..7bb4942 100644 --- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/QueueProducerTest.java +++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/QueueProducerTest.java @@ -22,7 +22,6 @@ import javax.jms.TextMessage; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; -import org.apache.camel.component.sjms.jms.JmsObjectFactory; import org.apache.camel.component.sjms.support.JmsTestSupport; import org.junit.Test; @@ -41,7 +40,7 @@ public class QueueProducerTest extends JmsTestSupport { @Test public void testQueueProducer() throws Exception { - MessageConsumer mc = JmsObjectFactory.createQueueConsumer(getSession(), TEST_DESTINATION_NAME); + MessageConsumer mc = createQueueConsumer(TEST_DESTINATION_NAME); assertNotNull(mc); final String expectedBody = "Hello World!"; MockEndpoint mock = getMockEndpoint("mock:result"); http://git-wip-us.apache.org/repos/asf/camel/blob/cb784c84/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/JmsTestSupport.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/JmsTestSupport.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/JmsTestSupport.java index a04fb07..8245ed9 100644 --- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/JmsTestSupport.java +++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/JmsTestSupport.java @@ -17,6 +17,8 @@ package org.apache.camel.component.sjms.support; import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.MessageConsumer; import javax.jms.Session; import javax.management.MalformedObjectNameException; import javax.management.ObjectName; @@ -28,6 +30,9 @@ import org.apache.camel.CamelContext; import org.apache.camel.Produce; import org.apache.camel.ProducerTemplate; import org.apache.camel.component.sjms.SjmsComponent; +import org.apache.camel.component.sjms.jms.DefaultDestinationCreationStrategy; +import org.apache.camel.component.sjms.jms.DestinationCreationStrategy; +import org.apache.camel.component.sjms.jms.JmsObjectFactory; import org.apache.camel.impl.DefaultCamelContext; import org.apache.camel.test.AvailablePortFinder; import org.apache.camel.test.junit4.CamelTestSupport; @@ -45,6 +50,7 @@ public class JmsTestSupport extends CamelTestSupport { private BrokerService broker; private Connection connection; private Session session; + private DestinationCreationStrategy destinationCreationStrategy = new DefaultDestinationCreationStrategy(); /** * Set up the Broker @@ -57,10 +63,8 @@ public class JmsTestSupport extends CamelTestSupport { protected void doPreSetup() throws Exception { deleteDirectory("target/activemq-data"); broker = new BrokerService(); - final int port = AvailablePortFinder.getNextAvailable(33333); + int port = AvailablePortFinder.getNextAvailable(33333); brokerUri = "tcp://localhost:" + port; - //Disable the JMX by default - broker.setUseJmx(false); broker.getManagementContext().setConnectorPort(AvailablePortFinder.getNextAvailable(port + 1)); configureBroker(broker); startBroker(); @@ -139,4 +143,12 @@ public class JmsTestSupport extends CamelTestSupport { public Session getSession() { return session; } + + public MessageConsumer createQueueConsumer(String destination) throws Exception { + return JmsObjectFactory.createMessageConsumer(session, destinationCreationStrategy.createDestination(session, destination, false), null, false, null); + } + + public MessageConsumer createTopicConsumer(String destination, String messageSelector) throws Exception { + return JmsObjectFactory.createMessageConsumer(session, destinationCreationStrategy.createDestination(session, destination, true), messageSelector, true, null); + } }