Repository: camel Updated Branches: refs/heads/master 424dcdad3 -> a93140cc9
CAMEL-10617: camel-sjms - Async start consumer should defer starting endpint Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a93140cc Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a93140cc Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a93140cc Branch: refs/heads/master Commit: a93140cc9f3e67ae15d58bf6bc4d8eeef6cbdd71 Parents: 424dcda Author: Claus Ibsen <davscl...@apache.org> Authored: Mon Dec 19 19:17:28 2016 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Mon Dec 19 19:17:28 2016 +0100 ---------------------------------------------------------------------- .../camel/component/sjms/SjmsConsumer.java | 22 +++++++--- .../camel/component/sjms/SjmsEndpoint.java | 46 +++++++++++++++----- .../camel/component/sjms/SjmsProducer.java | 22 +++++++--- .../component/sjms/producer/InOutProducer.java | 10 ++--- 4 files changed, 70 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/a93140cc/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java index e0c7984..f6064c2 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java @@ -105,11 +105,8 @@ public class SjmsConsumer extends DefaultConsumer { @Override protected void doStart() throws Exception { - if (getConnectionResource() == null) { - throw new IllegalArgumentException(String.format("ConnectionResource or ConnectionFactory must be configured for %s", this)); - } - super.doStart(); + this.executor = getEndpoint().getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, "SjmsConsumer"); if (consumers == null) { consumers = new GenericObjectPool<MessageConsumerResources>(new MessageConsumerResourcesFactory()); @@ -183,7 +180,8 @@ public class SjmsConsumer extends DefaultConsumer { */ private MessageConsumerResources createConsumer() throws Exception { MessageConsumerResources answer; - Connection conn = getConnectionResource().borrowConnection(); + ConnectionResource connectionResource = getOrCreateConnectionResource(); + Connection conn = connectionResource.borrowConnection(); try { Session session = conn.createSession(isTransacted(), isTransacted() ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE); Destination destination = getEndpoint().getDestinationCreationStrategy().createDestination(session, getDestinationName(), isTopic()); @@ -196,7 +194,7 @@ public class SjmsConsumer extends DefaultConsumer { log.error("Unable to create the MessageConsumer", e); throw e; } finally { - getConnectionResource().returnConnection(conn); + connectionResource.returnConnection(conn); } return answer; } @@ -250,10 +248,22 @@ public class SjmsConsumer extends DefaultConsumer { return messageHandler; } + /** + * @deprecated use {@link #getOrCreateConnectionResource()} + */ + @Deprecated protected ConnectionResource getConnectionResource() { return getEndpoint().getConnectionResource(); } + protected ConnectionResource getOrCreateConnectionResource() { + ConnectionResource answer = getEndpoint().getConnectionResource(); + if (answer == null) { + answer = getEndpoint().createConnectionResource(); + } + return answer; + } + public int getAcknowledgementMode() { return getEndpoint().getAcknowledgementMode().intValue(); } http://git-wip-us.apache.org/repos/asf/camel/blob/a93140cc/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java index 858c68d..35a2d89 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java @@ -48,6 +48,7 @@ import org.apache.camel.spi.UriEndpoint; import org.apache.camel.spi.UriParam; import org.apache.camel.spi.UriPath; import org.apache.camel.util.EndpointHelper; +import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -146,17 +147,18 @@ public class SjmsEndpoint extends DefaultEndpoint implements AsyncEndpoint, Mult @Override protected void doStart() throws Exception { super.doStart(); - if (getConnectionResource() == null) { - if (getConnectionFactory() != null) { - // We always use a connection pool, even for a pool of 1 - ConnectionFactoryResource connections = new ConnectionFactoryResource(getConnectionCount(), getConnectionFactory()); - connections.fillPool(); - connectionResource = connections; - // we created the resource so we should close it when stopping - closeConnectionResource = true; + + if (!isAsyncStartListener()) { + // if we are not async starting then create connection eager + if (getConnectionResource() == null) { + if (getConnectionFactory() != null) { + connectionResource = createConnectionResource(); + // we created the resource so we should close it when stopping + closeConnectionResource = true; + } + } else if (getConnectionResource() instanceof ConnectionFactoryResource) { + ((ConnectionFactoryResource) getConnectionResource()).fillPool(); } - } else if (getConnectionResource() instanceof ConnectionFactoryResource) { - ((ConnectionFactoryResource) getConnectionResource()).fillPool(); } } @@ -200,6 +202,22 @@ public class SjmsEndpoint extends DefaultEndpoint implements AsyncEndpoint, Mult return true; } + protected ConnectionResource createConnectionResource() { + if (getConnectionFactory() == null) { + throw new IllegalArgumentException(String.format("ConnectionResource or ConnectionFactory must be configured for %s", this)); + } + + try { + logger.debug("Creating ConnectionResource with connectionCount: {} using ConnectionFactory", getConnectionCount(), getConnectionFactory()); + // We always use a connection pool, even for a pool of 1 + ConnectionFactoryResource connections = new ConnectionFactoryResource(getConnectionCount(), getConnectionFactory()); + connections.fillPool(); + return connections; + } catch (Exception e) { + throw ObjectHelper.wrapRuntimeCamelException(e); + } + } + public Exchange createExchange(Message message, Session session) { Exchange exchange = createExchange(getExchangePattern()); exchange.setIn(new SjmsMessage(message, session, getBinding())); @@ -267,10 +285,14 @@ public class SjmsEndpoint extends DefaultEndpoint implements AsyncEndpoint, Mult } public ConnectionResource getConnectionResource() { + ConnectionResource answer = null; if (connectionResource != null) { - return connectionResource; + answer = connectionResource; } - return getComponent().getConnectionResource(); + if (answer == null) { + answer = getComponent().getConnectionResource(); + } + return answer; } /** http://git-wip-us.apache.org/repos/asf/camel/blob/a93140cc/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java index 91515db..585a6c4 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java @@ -84,11 +84,8 @@ public abstract class SjmsProducer extends DefaultAsyncProducer { @Override protected void doStart() throws Exception { - if (getConnectionResource() == null) { - throw new IllegalArgumentException(String.format("ConnectionResource or ConnectionFactory must be configured for %s", this)); - } - super.doStart(); + this.executor = getEndpoint().getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, "SjmsProducer"); if (getProducers() == null) { setProducers(new GenericObjectPool<MessageProducerResources>(new MessageProducerResourcesFactory())); @@ -180,14 +177,15 @@ public abstract class SjmsProducer extends DefaultAsyncProducer { } protected Session createSession() throws Exception { - Connection conn = getConnectionResource().borrowConnection(); + ConnectionResource connectionResource = getOrCreateConnectionResource(); + Connection conn = connectionResource.borrowConnection(); try { return conn.createSession(isEndpointTransacted(), getAcknowledgeMode()); } catch (Exception e) { log.error("Unable to create the Session", e); throw e; } finally { - getConnectionResource().returnConnection(conn); + connectionResource.returnConnection(conn); } } @@ -278,10 +276,22 @@ public abstract class SjmsProducer extends DefaultAsyncProducer { return isSynchronous(); } + /** + * @deprecated use {@link #getOrCreateConnectionResource()} + */ + @Deprecated protected ConnectionResource getConnectionResource() { return getEndpoint().getConnectionResource(); } + protected ConnectionResource getOrCreateConnectionResource() { + ConnectionResource answer = getEndpoint().getConnectionResource(); + if (answer == null) { + answer = getEndpoint().createConnectionResource(); + } + return answer; + } + /** * Gets the acknowledgment mode for this instance of DestinationProducer. * http://git-wip-us.apache.org/repos/asf/camel/blob/a93140cc/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java index a901400..fef9532 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java @@ -36,6 +36,7 @@ import org.apache.camel.component.sjms.MessageProducerResources; import org.apache.camel.component.sjms.SjmsEndpoint; import org.apache.camel.component.sjms.SjmsMessage; import org.apache.camel.component.sjms.SjmsProducer; +import org.apache.camel.component.sjms.jms.ConnectionResource; import org.apache.camel.component.sjms.jms.JmsConstants; import org.apache.camel.component.sjms.jms.JmsMessageHelper; import org.apache.camel.component.sjms.jms.JmsObjectFactory; @@ -75,7 +76,8 @@ public class InOutProducer extends SjmsProducer { @Override public MessageConsumerResources makeObject() throws Exception { MessageConsumerResources answer; - Connection conn = getConnectionResource().borrowConnection(); + ConnectionResource connectionResource = getOrCreateConnectionResource(); + Connection conn = connectionResource.borrowConnection(); try { Session session; if (isEndpointTransacted()) { @@ -111,7 +113,7 @@ public class InOutProducer extends SjmsProducer { log.error("Unable to create the MessageConsumerResource: " + e.getLocalizedMessage()); throw new CamelException(e); } finally { - getConnectionResource().returnConnection(conn); + connectionResource.returnConnection(conn); } return answer; } @@ -142,10 +144,6 @@ public class InOutProducer extends SjmsProducer { throw new IllegalArgumentException("InOut exchange pattern is incompatible with transacted=true as it cuases a deadlock. Please use transacted=false or InOnly exchange pattern."); } - if (getConnectionResource() == null) { - throw new IllegalArgumentException(String.format("ConnectionResource or ConnectionFactory must be configured for %s", this)); - } - if (ObjectHelper.isEmpty(getNamedReplyTo())) { log.debug("No reply to destination is defined. Using temporary destinations."); } else {