Repository: camel
Updated Branches:
  refs/heads/master 424dcdad3 -> a93140cc9


CAMEL-10617: camel-sjms - Async start consumer should defer starting endpint


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a93140cc
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a93140cc
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a93140cc

Branch: refs/heads/master
Commit: a93140cc9f3e67ae15d58bf6bc4d8eeef6cbdd71
Parents: 424dcda
Author: Claus Ibsen <davscl...@apache.org>
Authored: Mon Dec 19 19:17:28 2016 +0100
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Mon Dec 19 19:17:28 2016 +0100

----------------------------------------------------------------------
 .../camel/component/sjms/SjmsConsumer.java      | 22 +++++++---
 .../camel/component/sjms/SjmsEndpoint.java      | 46 +++++++++++++++-----
 .../camel/component/sjms/SjmsProducer.java      | 22 +++++++---
 .../component/sjms/producer/InOutProducer.java  | 10 ++---
 4 files changed, 70 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/a93140cc/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java
 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java
index e0c7984..f6064c2 100644
--- 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java
+++ 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java
@@ -105,11 +105,8 @@ public class SjmsConsumer extends DefaultConsumer {
 
     @Override
     protected void doStart() throws Exception {
-        if (getConnectionResource() == null) {
-            throw new 
IllegalArgumentException(String.format("ConnectionResource or ConnectionFactory 
must be configured for %s", this));
-        }
-
         super.doStart();
+
         this.executor = 
getEndpoint().getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this,
 "SjmsConsumer");
         if (consumers == null) {
             consumers = new GenericObjectPool<MessageConsumerResources>(new 
MessageConsumerResourcesFactory());
@@ -183,7 +180,8 @@ public class SjmsConsumer extends DefaultConsumer {
      */
     private MessageConsumerResources createConsumer() throws Exception {
         MessageConsumerResources answer;
-        Connection conn = getConnectionResource().borrowConnection();
+        ConnectionResource connectionResource = 
getOrCreateConnectionResource();
+        Connection conn = connectionResource.borrowConnection();
         try {
             Session session = conn.createSession(isTransacted(), 
isTransacted() ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
             Destination destination = 
getEndpoint().getDestinationCreationStrategy().createDestination(session, 
getDestinationName(), isTopic());
@@ -196,7 +194,7 @@ public class SjmsConsumer extends DefaultConsumer {
             log.error("Unable to create the MessageConsumer", e);
             throw e;
         } finally {
-            getConnectionResource().returnConnection(conn);
+            connectionResource.returnConnection(conn);
         }
         return answer;
     }
@@ -250,10 +248,22 @@ public class SjmsConsumer extends DefaultConsumer {
         return messageHandler;
     }
 
+    /**
+     * @deprecated use {@link #getOrCreateConnectionResource()}
+     */
+    @Deprecated
     protected ConnectionResource getConnectionResource() {
         return getEndpoint().getConnectionResource();
     }
 
+    protected ConnectionResource getOrCreateConnectionResource() {
+        ConnectionResource answer = getEndpoint().getConnectionResource();
+        if (answer == null) {
+            answer = getEndpoint().createConnectionResource();
+        }
+        return answer;
+    }
+
     public int getAcknowledgementMode() {
         return getEndpoint().getAcknowledgementMode().intValue();
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/a93140cc/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
index 858c68d..35a2d89 100644
--- 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
+++ 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
@@ -48,6 +48,7 @@ import org.apache.camel.spi.UriEndpoint;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriPath;
 import org.apache.camel.util.EndpointHelper;
+import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -146,17 +147,18 @@ public class SjmsEndpoint extends DefaultEndpoint 
implements AsyncEndpoint, Mult
     @Override
     protected void doStart() throws Exception {
         super.doStart();
-        if (getConnectionResource() == null) {
-            if (getConnectionFactory() != null) {
-                // We always use a connection pool, even for a pool of 1
-                ConnectionFactoryResource connections = new 
ConnectionFactoryResource(getConnectionCount(), getConnectionFactory());
-                connections.fillPool();
-                connectionResource = connections;
-                // we created the resource so we should close it when stopping
-                closeConnectionResource = true;
+
+        if (!isAsyncStartListener()) {
+            // if we are not async starting then create connection eager
+            if (getConnectionResource() == null) {
+                if (getConnectionFactory() != null) {
+                    connectionResource = createConnectionResource();
+                    // we created the resource so we should close it when 
stopping
+                    closeConnectionResource = true;
+                }
+            } else if (getConnectionResource() instanceof 
ConnectionFactoryResource) {
+                ((ConnectionFactoryResource) 
getConnectionResource()).fillPool();
             }
-        } else if (getConnectionResource() instanceof 
ConnectionFactoryResource) {
-            ((ConnectionFactoryResource) getConnectionResource()).fillPool();
         }
     }
 
@@ -200,6 +202,22 @@ public class SjmsEndpoint extends DefaultEndpoint 
implements AsyncEndpoint, Mult
         return true;
     }
 
+    protected ConnectionResource createConnectionResource() {
+        if (getConnectionFactory() == null) {
+            throw new 
IllegalArgumentException(String.format("ConnectionResource or ConnectionFactory 
must be configured for %s", this));
+        }
+
+        try {
+            logger.debug("Creating ConnectionResource with connectionCount: {} 
using ConnectionFactory", getConnectionCount(), getConnectionFactory());
+            // We always use a connection pool, even for a pool of 1
+            ConnectionFactoryResource connections = new 
ConnectionFactoryResource(getConnectionCount(), getConnectionFactory());
+            connections.fillPool();
+            return connections;
+        } catch (Exception e) {
+            throw ObjectHelper.wrapRuntimeCamelException(e);
+        }
+    }
+
     public Exchange createExchange(Message message, Session session) {
         Exchange exchange = createExchange(getExchangePattern());
         exchange.setIn(new SjmsMessage(message, session, getBinding()));
@@ -267,10 +285,14 @@ public class SjmsEndpoint extends DefaultEndpoint 
implements AsyncEndpoint, Mult
     }
 
     public ConnectionResource getConnectionResource() {
+        ConnectionResource answer = null;
         if (connectionResource != null) {
-            return connectionResource;
+            answer = connectionResource;
         }
-        return getComponent().getConnectionResource();
+        if (answer == null) {
+            answer = getComponent().getConnectionResource();
+        }
+        return answer;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/camel/blob/a93140cc/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
index 91515db..585a6c4 100644
--- 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
+++ 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
@@ -84,11 +84,8 @@ public abstract class SjmsProducer extends 
DefaultAsyncProducer {
 
     @Override
     protected void doStart() throws Exception {
-        if (getConnectionResource() == null) {
-            throw new 
IllegalArgumentException(String.format("ConnectionResource or ConnectionFactory 
must be configured for %s", this));
-        }
-
         super.doStart();
+
         this.executor = 
getEndpoint().getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this,
 "SjmsProducer");
         if (getProducers() == null) {
             setProducers(new GenericObjectPool<MessageProducerResources>(new 
MessageProducerResourcesFactory()));
@@ -180,14 +177,15 @@ public abstract class SjmsProducer extends 
DefaultAsyncProducer {
     }
 
     protected Session createSession() throws Exception {
-        Connection conn = getConnectionResource().borrowConnection();
+        ConnectionResource connectionResource = 
getOrCreateConnectionResource();
+        Connection conn = connectionResource.borrowConnection();
         try {
             return conn.createSession(isEndpointTransacted(), 
getAcknowledgeMode());
         } catch (Exception e) {
             log.error("Unable to create the Session", e);
             throw e;
         } finally {
-            getConnectionResource().returnConnection(conn);
+            connectionResource.returnConnection(conn);
         }
     }
 
@@ -278,10 +276,22 @@ public abstract class SjmsProducer extends 
DefaultAsyncProducer {
         return isSynchronous();
     }
 
+    /**
+     * @deprecated use {@link #getOrCreateConnectionResource()}
+     */
+    @Deprecated
     protected ConnectionResource getConnectionResource() {
         return getEndpoint().getConnectionResource();
     }
 
+    protected ConnectionResource getOrCreateConnectionResource() {
+        ConnectionResource answer = getEndpoint().getConnectionResource();
+        if (answer == null) {
+            answer = getEndpoint().createConnectionResource();
+        }
+        return answer;
+    }
+
     /**
      * Gets the acknowledgment mode for this instance of DestinationProducer.
      *

http://git-wip-us.apache.org/repos/asf/camel/blob/a93140cc/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
index a901400..fef9532 100644
--- 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
+++ 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
@@ -36,6 +36,7 @@ import 
org.apache.camel.component.sjms.MessageProducerResources;
 import org.apache.camel.component.sjms.SjmsEndpoint;
 import org.apache.camel.component.sjms.SjmsMessage;
 import org.apache.camel.component.sjms.SjmsProducer;
+import org.apache.camel.component.sjms.jms.ConnectionResource;
 import org.apache.camel.component.sjms.jms.JmsConstants;
 import org.apache.camel.component.sjms.jms.JmsMessageHelper;
 import org.apache.camel.component.sjms.jms.JmsObjectFactory;
@@ -75,7 +76,8 @@ public class InOutProducer extends SjmsProducer {
         @Override
         public MessageConsumerResources makeObject() throws Exception {
             MessageConsumerResources answer;
-            Connection conn = getConnectionResource().borrowConnection();
+            ConnectionResource connectionResource = 
getOrCreateConnectionResource();
+            Connection conn = connectionResource.borrowConnection();
             try {
                 Session session;
                 if (isEndpointTransacted()) {
@@ -111,7 +113,7 @@ public class InOutProducer extends SjmsProducer {
                 log.error("Unable to create the MessageConsumerResource: " + 
e.getLocalizedMessage());
                 throw new CamelException(e);
             } finally {
-                getConnectionResource().returnConnection(conn);
+                connectionResource.returnConnection(conn);
             }
             return answer;
         }
@@ -142,10 +144,6 @@ public class InOutProducer extends SjmsProducer {
             throw new IllegalArgumentException("InOut exchange pattern is 
incompatible with transacted=true as it cuases a deadlock. Please use 
transacted=false or InOnly exchange pattern.");
         }
 
-        if (getConnectionResource() == null) {
-            throw new 
IllegalArgumentException(String.format("ConnectionResource or ConnectionFactory 
must be configured for %s", this));
-        }
-
         if (ObjectHelper.isEmpty(getNamedReplyTo())) {
             log.debug("No reply to destination is defined.  Using temporary 
destinations.");
         } else {

Reply via email to