Repository: camel
Updated Branches:
  refs/heads/master b0c572fe6 -> 2cff2f15f


CAMEL-7727: Unify MessageProducerResources handling into SjmsProducer


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/f683b0b7
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/f683b0b7
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/f683b0b7

Branch: refs/heads/master
Commit: f683b0b7b1933fa492ab4f7a456691d90b17698c
Parents: b0c572f
Author: Cristiano Nicolai <cristiano.nico...@gmail.com>
Authored: Wed Aug 20 21:32:05 2014 +1000
Committer: Willem Jiang <willem.ji...@gmail.com>
Committed: Thu Aug 21 10:08:04 2014 +0800

----------------------------------------------------------------------
 .../camel/component/sjms/SjmsProducer.java      |  39 +++---
 .../component/sjms/producer/InOnlyProducer.java |  53 ++++----
 .../component/sjms/producer/InOutProducer.java  | 130 +++++++++----------
 3 files changed, 105 insertions(+), 117 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/f683b0b7/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
index 1a31818..3edc761 100644
--- 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
+++ 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
@@ -195,7 +195,7 @@ public abstract class SjmsProducer extends 
DefaultAsyncProducer {
 
     public abstract MessageProducerResources doCreateProducerModel() throws 
Exception;
 
-    public abstract void sendMessage(Exchange exchange, final AsyncCallback 
callback) throws Exception;
+    public abstract void sendMessage(Exchange exchange, final AsyncCallback 
callback, final MessageProducerResources producer) throws Exception;
 
     @Override
     public boolean process(final Exchange exchange, final AsyncCallback 
callback) {
@@ -204,25 +204,30 @@ public abstract class SjmsProducer extends 
DefaultAsyncProducer {
         }
 
         try {
-            if (!isSynchronous()) {
-                if (log.isDebugEnabled()) {
-                    log.debug("  Sending message asynchronously: {}", 
exchange.getIn().getBody());
-                }
-                getExecutor().execute(new Runnable() {
-                    @Override
-                    public void run() {
-                        try {
-                            sendMessage(exchange, callback);
-                        } catch (Exception e) {
-                            ObjectHelper.wrapRuntimeCamelException(e);
+            final MessageProducerResources producer = 
getProducers().borrowObject(getResponseTimeOut());
+            if(producer==null){
+                exchange.setException(new Exception("Unable to send message: 
connection not available"));
+            } else {
+                if (!isSynchronous()) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("  Sending message asynchronously: {}", 
exchange.getIn().getBody());
+                    }
+                    getExecutor().execute(new Runnable() {
+                        @Override
+                        public void run() {
+                            try {
+                                sendMessage(exchange, callback, producer);
+                            } catch (Exception e) {
+                                ObjectHelper.wrapRuntimeCamelException(e);
+                            }
                         }
+                    });
+                } else {
+                    if (log.isDebugEnabled()) {
+                        log.debug("  Sending message synchronously: {}", 
exchange.getIn().getBody());
                     }
-                });
-            } else {
-                if (log.isDebugEnabled()) {
-                    log.debug("  Sending message synchronously: {}", 
exchange.getIn().getBody());
+                    sendMessage(exchange, callback, producer);
                 }
-                sendMessage(exchange, callback);
             }
         } catch (Exception e) {
             if (log.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/camel/blob/f683b0b7/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java
 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java
index e841e6b..c83546f 100644
--- 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java
+++ 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java
@@ -91,41 +91,36 @@ public class InOnlyProducer extends SjmsProducer {
      * @throws Exception
      */
     @Override
-    public void sendMessage(final Exchange exchange, final AsyncCallback 
callback) throws Exception {
-        Collection<Message> messages = new ArrayList<Message>(1);
-        MessageProducerResources producer = getProducers().borrowObject();
+    public void sendMessage(final Exchange exchange, final AsyncCallback 
callback, final MessageProducerResources producer) throws Exception {
         try {
-            if (producer != null) {
-                if (exchange.getIn().getBody() != null) {
-                    if (exchange.getIn().getBody() instanceof List) {
-                        Iterable<?> payload = 
(Iterable<?>)exchange.getIn().getBody();
-                        for (final Object object : payload) {
-                            Message message;
-                            if (BatchMessage.class.isInstance(object)) {
-                                BatchMessage<?> batchMessage = 
(BatchMessage<?>)object;
-                                message = 
JmsMessageHelper.createMessage(producer.getSession(), 
batchMessage.getPayload(), batchMessage.getHeaders(), getSjmsEndpoint()
-                                    .getJmsKeyFormatStrategy());
-                            } else {
-                                message = 
JmsMessageHelper.createMessage(producer.getSession(), object, 
exchange.getIn().getHeaders(), getSjmsEndpoint().getJmsKeyFormatStrategy());
-                            }
-                            messages.add(message);
+            Collection<Message> messages = new ArrayList<Message>(1);
+            if (exchange.getIn().getBody() != null) {
+                if (exchange.getIn().getBody() instanceof List) {
+                    Iterable<?> payload = 
(Iterable<?>)exchange.getIn().getBody();
+                    for (final Object object : payload) {
+                        Message message;
+                        if (BatchMessage.class.isInstance(object)) {
+                            BatchMessage<?> batchMessage = 
(BatchMessage<?>)object;
+                            message = 
JmsMessageHelper.createMessage(producer.getSession(), 
batchMessage.getPayload(), batchMessage.getHeaders(), getSjmsEndpoint()
+                                .getJmsKeyFormatStrategy());
+                        } else {
+                            message = 
JmsMessageHelper.createMessage(producer.getSession(), object, 
exchange.getIn().getHeaders(), getSjmsEndpoint().getJmsKeyFormatStrategy());
                         }
-                    } else {
-                        Object payload = exchange.getIn().getBody();
-                        Message message = JmsMessageHelper
-                            .createMessage(producer.getSession(), payload, 
exchange.getIn().getHeaders(), getSjmsEndpoint().getJmsKeyFormatStrategy());
                         messages.add(message);
                     }
+                } else {
+                    Object payload = exchange.getIn().getBody();
+                    Message message = JmsMessageHelper
+                        .createMessage(producer.getSession(), payload, 
exchange.getIn().getHeaders(), getSjmsEndpoint().getJmsKeyFormatStrategy());
+                    messages.add(message);
                 }
+            }
 
-                if (isEndpointTransacted()) {
-                    exchange.getUnitOfWork().addSynchronization(new 
SessionTransactionSynchronization(producer.getSession(), 
producer.getCommitStrategy()));
-                }
-                for (final Message message : messages) {
-                    producer.getMessageProducer().send(message);
-                }
-            } else {
-                exchange.setException(new Exception("Unable to send message: 
connection not available"));
+            if (isEndpointTransacted()) {
+                exchange.getUnitOfWork().addSynchronization(new 
SessionTransactionSynchronization(producer.getSession(), 
producer.getCommitStrategy()));
+            }
+            for (final Message message : messages) {
+                producer.getMessageProducer().send(message);
             }
         } catch (Exception e) {
             exchange.setException(new Exception("Unable to complete sending 
the message: " + e.getLocalizedMessage()));

http://git-wip-us.apache.org/repos/asf/camel/blob/f683b0b7/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
index 2b93df7..605b15c 100644
--- 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
+++ 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
@@ -282,86 +282,74 @@ public class InOutProducer extends SjmsProducer {
      * @throws Exception
      */
     @Override
-    public void sendMessage(final Exchange exchange, final AsyncCallback 
callback) throws Exception {
-        if (getProducers() != null) {
-            MessageProducerResources producer = null;
-            try {
-                producer = getProducers().borrowObject(getResponseTimeOut());
-            } catch (Exception e1) {
-                log.warn("The producer pool is exhausted.  Consider setting 
producerCount to a higher value or disable the fixed size of the pool by 
setting fixedResourcePool=false.");
-                exchange.setException(new Exception("Producer Resource Pool is 
exhausted"));
-            }
-            if (producer != null) {
-
-                if (isEndpointTransacted()) {
-                    exchange.getUnitOfWork().addSynchronization(new 
SessionTransactionSynchronization(producer.getSession(), getCommitStrategy()));
-                }
+    public void sendMessage(final Exchange exchange, final AsyncCallback 
callback, final MessageProducerResources producer) throws Exception {
+        if (isEndpointTransacted()) {
+            exchange.getUnitOfWork().addSynchronization(new 
SessionTransactionSynchronization(producer.getSession(), getCommitStrategy()));
+        }
 
-                Message request = 
SjmsExchangeMessageHelper.createMessage(exchange, producer.getSession(), 
getSjmsEndpoint().getJmsKeyFormatStrategy());
+        Message request = SjmsExchangeMessageHelper.createMessage(exchange, 
producer.getSession(), getSjmsEndpoint().getJmsKeyFormatStrategy());
 
-                // TODO just set the correlation id don't get it from the
-                // message
-                String correlationId = null;
-                if (exchange.getIn().getHeader("JMSCorrelationID", 
String.class) == null) {
-                    correlationId = UUID.randomUUID().toString().replace("-", 
"");
-                } else {
-                    correlationId = 
exchange.getIn().getHeader("JMSCorrelationID", String.class);
-                }
-                Object responseObject = null;
-                Exchanger<Object> messageExchanger = new Exchanger<Object>();
-                SjmsExchangeMessageHelper.setCorrelationId(request, 
correlationId);
-                try {
-                    lock.writeLock().lock();
-                    exchangerMap.put(correlationId, messageExchanger);
-                } finally {
-                    lock.writeLock().unlock();
-                }
+        // TODO just set the correlation id don't get it from the
+        // message
+        String correlationId = null;
+        if (exchange.getIn().getHeader("JMSCorrelationID", String.class) == 
null) {
+            correlationId = UUID.randomUUID().toString().replace("-", "");
+        } else {
+            correlationId = exchange.getIn().getHeader("JMSCorrelationID", 
String.class);
+        }
+        Object responseObject = null;
+        Exchanger<Object> messageExchanger = new Exchanger<Object>();
+        SjmsExchangeMessageHelper.setCorrelationId(request, correlationId);
+        try {
+            lock.writeLock().lock();
+            exchangerMap.put(correlationId, messageExchanger);
+        } finally {
+            lock.writeLock().unlock();
+        }
 
-                MessageConsumerResource consumer = 
consumers.borrowObject(getResponseTimeOut());
-                SjmsExchangeMessageHelper.setJMSReplyTo(request, 
consumer.getReplyToDestination());
-                consumers.returnObject(consumer);
-                producer.getMessageProducer().send(request);
-
-                // Return the producer to the pool so another waiting producer
-                // can move forward
-                // without waiting on us to complete the exchange
-                try {
-                    getProducers().returnObject(producer);
-                } catch (Exception exception) {
-                    // thrown if the pool is full. safe to ignore.
-                }
+        MessageConsumerResource consumer = 
consumers.borrowObject(getResponseTimeOut());
+        SjmsExchangeMessageHelper.setJMSReplyTo(request, 
consumer.getReplyToDestination());
+        consumers.returnObject(consumer);
+        producer.getMessageProducer().send(request);
 
-                try {
-                    responseObject = messageExchanger.exchange(null, 
getResponseTimeOut(), TimeUnit.MILLISECONDS);
+        // Return the producer to the pool so another waiting producer
+        // can move forward
+        // without waiting on us to complete the exchange
+        try {
+            getProducers().returnObject(producer);
+        } catch (Exception exception) {
+            // thrown if the pool is full. safe to ignore.
+        }
 
-                    try {
-                        lock.writeLock().lock();
-                        exchangerMap.remove(correlationId);
-                    } finally {
-                        lock.writeLock().unlock();
-                    }
-                } catch (InterruptedException e) {
-                    log.debug("Exchanger was interrupted while waiting on 
response", e);
-                    exchange.setException(e);
-                } catch (TimeoutException e) {
-                    log.debug("Exchanger timed out while waiting on response", 
e);
-                    exchange.setException(e);
-                }
+        try {
+            responseObject = messageExchanger.exchange(null, 
getResponseTimeOut(), TimeUnit.MILLISECONDS);
 
-                if (exchange.getException() == null) {
-                    if (responseObject instanceof Throwable) {
-                        exchange.setException((Throwable)responseObject);
-                    } else if (responseObject instanceof Message) {
-                        Message response = (Message)responseObject;
-                        SjmsExchangeMessageHelper.populateExchange(response, 
exchange, true);
-                    } else {
-                        exchange.setException(new CamelException("Unknown 
response type: " + responseObject));
-                    }
-                }
+            try {
+                lock.writeLock().lock();
+                exchangerMap.remove(correlationId);
+            } finally {
+                lock.writeLock().unlock();
             }
+        } catch (InterruptedException e) {
+            log.debug("Exchanger was interrupted while waiting on response", 
e);
+            exchange.setException(e);
+        } catch (TimeoutException e) {
+            log.debug("Exchanger timed out while waiting on response", e);
+            exchange.setException(e);
+        }
 
-            callback.done(isSynchronous());
+        if (exchange.getException() == null) {
+            if (responseObject instanceof Throwable) {
+                exchange.setException((Throwable)responseObject);
+            } else if (responseObject instanceof Message) {
+                Message response = (Message)responseObject;
+                SjmsExchangeMessageHelper.populateExchange(response, exchange, 
true);
+            } else {
+                exchange.setException(new CamelException("Unknown response 
type: " + responseObject));
+            }
         }
+
+        callback.done(isSynchronous());
     }
 
     public void setConsumers(MessageConsumerPool consumers) {

Reply via email to