Repository: camel Updated Branches: refs/heads/master b0c572fe6 -> 2cff2f15f
CAMEL-7727: Unify MessageProducerResources handling into SjmsProducer Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/f683b0b7 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/f683b0b7 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/f683b0b7 Branch: refs/heads/master Commit: f683b0b7b1933fa492ab4f7a456691d90b17698c Parents: b0c572f Author: Cristiano Nicolai <cristiano.nico...@gmail.com> Authored: Wed Aug 20 21:32:05 2014 +1000 Committer: Willem Jiang <willem.ji...@gmail.com> Committed: Thu Aug 21 10:08:04 2014 +0800 ---------------------------------------------------------------------- .../camel/component/sjms/SjmsProducer.java | 39 +++--- .../component/sjms/producer/InOnlyProducer.java | 53 ++++---- .../component/sjms/producer/InOutProducer.java | 130 +++++++++---------- 3 files changed, 105 insertions(+), 117 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/f683b0b7/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java index 1a31818..3edc761 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java @@ -195,7 +195,7 @@ public abstract class SjmsProducer extends DefaultAsyncProducer { public abstract MessageProducerResources doCreateProducerModel() throws Exception; - public abstract void sendMessage(Exchange exchange, final AsyncCallback callback) throws Exception; + public abstract void sendMessage(Exchange exchange, final AsyncCallback callback, final MessageProducerResources producer) throws Exception; @Override public boolean process(final Exchange exchange, final AsyncCallback callback) { @@ -204,25 +204,30 @@ public abstract class SjmsProducer extends DefaultAsyncProducer { } try { - if (!isSynchronous()) { - if (log.isDebugEnabled()) { - log.debug(" Sending message asynchronously: {}", exchange.getIn().getBody()); - } - getExecutor().execute(new Runnable() { - @Override - public void run() { - try { - sendMessage(exchange, callback); - } catch (Exception e) { - ObjectHelper.wrapRuntimeCamelException(e); + final MessageProducerResources producer = getProducers().borrowObject(getResponseTimeOut()); + if(producer==null){ + exchange.setException(new Exception("Unable to send message: connection not available")); + } else { + if (!isSynchronous()) { + if (log.isDebugEnabled()) { + log.debug(" Sending message asynchronously: {}", exchange.getIn().getBody()); + } + getExecutor().execute(new Runnable() { + @Override + public void run() { + try { + sendMessage(exchange, callback, producer); + } catch (Exception e) { + ObjectHelper.wrapRuntimeCamelException(e); + } } + }); + } else { + if (log.isDebugEnabled()) { + log.debug(" Sending message synchronously: {}", exchange.getIn().getBody()); } - }); - } else { - if (log.isDebugEnabled()) { - log.debug(" Sending message synchronously: {}", exchange.getIn().getBody()); + sendMessage(exchange, callback, producer); } - sendMessage(exchange, callback); } } catch (Exception e) { if (log.isDebugEnabled()) { http://git-wip-us.apache.org/repos/asf/camel/blob/f683b0b7/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java index e841e6b..c83546f 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java @@ -91,41 +91,36 @@ public class InOnlyProducer extends SjmsProducer { * @throws Exception */ @Override - public void sendMessage(final Exchange exchange, final AsyncCallback callback) throws Exception { - Collection<Message> messages = new ArrayList<Message>(1); - MessageProducerResources producer = getProducers().borrowObject(); + public void sendMessage(final Exchange exchange, final AsyncCallback callback, final MessageProducerResources producer) throws Exception { try { - if (producer != null) { - if (exchange.getIn().getBody() != null) { - if (exchange.getIn().getBody() instanceof List) { - Iterable<?> payload = (Iterable<?>)exchange.getIn().getBody(); - for (final Object object : payload) { - Message message; - if (BatchMessage.class.isInstance(object)) { - BatchMessage<?> batchMessage = (BatchMessage<?>)object; - message = JmsMessageHelper.createMessage(producer.getSession(), batchMessage.getPayload(), batchMessage.getHeaders(), getSjmsEndpoint() - .getJmsKeyFormatStrategy()); - } else { - message = JmsMessageHelper.createMessage(producer.getSession(), object, exchange.getIn().getHeaders(), getSjmsEndpoint().getJmsKeyFormatStrategy()); - } - messages.add(message); + Collection<Message> messages = new ArrayList<Message>(1); + if (exchange.getIn().getBody() != null) { + if (exchange.getIn().getBody() instanceof List) { + Iterable<?> payload = (Iterable<?>)exchange.getIn().getBody(); + for (final Object object : payload) { + Message message; + if (BatchMessage.class.isInstance(object)) { + BatchMessage<?> batchMessage = (BatchMessage<?>)object; + message = JmsMessageHelper.createMessage(producer.getSession(), batchMessage.getPayload(), batchMessage.getHeaders(), getSjmsEndpoint() + .getJmsKeyFormatStrategy()); + } else { + message = JmsMessageHelper.createMessage(producer.getSession(), object, exchange.getIn().getHeaders(), getSjmsEndpoint().getJmsKeyFormatStrategy()); } - } else { - Object payload = exchange.getIn().getBody(); - Message message = JmsMessageHelper - .createMessage(producer.getSession(), payload, exchange.getIn().getHeaders(), getSjmsEndpoint().getJmsKeyFormatStrategy()); messages.add(message); } + } else { + Object payload = exchange.getIn().getBody(); + Message message = JmsMessageHelper + .createMessage(producer.getSession(), payload, exchange.getIn().getHeaders(), getSjmsEndpoint().getJmsKeyFormatStrategy()); + messages.add(message); } + } - if (isEndpointTransacted()) { - exchange.getUnitOfWork().addSynchronization(new SessionTransactionSynchronization(producer.getSession(), producer.getCommitStrategy())); - } - for (final Message message : messages) { - producer.getMessageProducer().send(message); - } - } else { - exchange.setException(new Exception("Unable to send message: connection not available")); + if (isEndpointTransacted()) { + exchange.getUnitOfWork().addSynchronization(new SessionTransactionSynchronization(producer.getSession(), producer.getCommitStrategy())); + } + for (final Message message : messages) { + producer.getMessageProducer().send(message); } } catch (Exception e) { exchange.setException(new Exception("Unable to complete sending the message: " + e.getLocalizedMessage())); http://git-wip-us.apache.org/repos/asf/camel/blob/f683b0b7/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java index 2b93df7..605b15c 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java @@ -282,86 +282,74 @@ public class InOutProducer extends SjmsProducer { * @throws Exception */ @Override - public void sendMessage(final Exchange exchange, final AsyncCallback callback) throws Exception { - if (getProducers() != null) { - MessageProducerResources producer = null; - try { - producer = getProducers().borrowObject(getResponseTimeOut()); - } catch (Exception e1) { - log.warn("The producer pool is exhausted. Consider setting producerCount to a higher value or disable the fixed size of the pool by setting fixedResourcePool=false."); - exchange.setException(new Exception("Producer Resource Pool is exhausted")); - } - if (producer != null) { - - if (isEndpointTransacted()) { - exchange.getUnitOfWork().addSynchronization(new SessionTransactionSynchronization(producer.getSession(), getCommitStrategy())); - } + public void sendMessage(final Exchange exchange, final AsyncCallback callback, final MessageProducerResources producer) throws Exception { + if (isEndpointTransacted()) { + exchange.getUnitOfWork().addSynchronization(new SessionTransactionSynchronization(producer.getSession(), getCommitStrategy())); + } - Message request = SjmsExchangeMessageHelper.createMessage(exchange, producer.getSession(), getSjmsEndpoint().getJmsKeyFormatStrategy()); + Message request = SjmsExchangeMessageHelper.createMessage(exchange, producer.getSession(), getSjmsEndpoint().getJmsKeyFormatStrategy()); - // TODO just set the correlation id don't get it from the - // message - String correlationId = null; - if (exchange.getIn().getHeader("JMSCorrelationID", String.class) == null) { - correlationId = UUID.randomUUID().toString().replace("-", ""); - } else { - correlationId = exchange.getIn().getHeader("JMSCorrelationID", String.class); - } - Object responseObject = null; - Exchanger<Object> messageExchanger = new Exchanger<Object>(); - SjmsExchangeMessageHelper.setCorrelationId(request, correlationId); - try { - lock.writeLock().lock(); - exchangerMap.put(correlationId, messageExchanger); - } finally { - lock.writeLock().unlock(); - } + // TODO just set the correlation id don't get it from the + // message + String correlationId = null; + if (exchange.getIn().getHeader("JMSCorrelationID", String.class) == null) { + correlationId = UUID.randomUUID().toString().replace("-", ""); + } else { + correlationId = exchange.getIn().getHeader("JMSCorrelationID", String.class); + } + Object responseObject = null; + Exchanger<Object> messageExchanger = new Exchanger<Object>(); + SjmsExchangeMessageHelper.setCorrelationId(request, correlationId); + try { + lock.writeLock().lock(); + exchangerMap.put(correlationId, messageExchanger); + } finally { + lock.writeLock().unlock(); + } - MessageConsumerResource consumer = consumers.borrowObject(getResponseTimeOut()); - SjmsExchangeMessageHelper.setJMSReplyTo(request, consumer.getReplyToDestination()); - consumers.returnObject(consumer); - producer.getMessageProducer().send(request); - - // Return the producer to the pool so another waiting producer - // can move forward - // without waiting on us to complete the exchange - try { - getProducers().returnObject(producer); - } catch (Exception exception) { - // thrown if the pool is full. safe to ignore. - } + MessageConsumerResource consumer = consumers.borrowObject(getResponseTimeOut()); + SjmsExchangeMessageHelper.setJMSReplyTo(request, consumer.getReplyToDestination()); + consumers.returnObject(consumer); + producer.getMessageProducer().send(request); - try { - responseObject = messageExchanger.exchange(null, getResponseTimeOut(), TimeUnit.MILLISECONDS); + // Return the producer to the pool so another waiting producer + // can move forward + // without waiting on us to complete the exchange + try { + getProducers().returnObject(producer); + } catch (Exception exception) { + // thrown if the pool is full. safe to ignore. + } - try { - lock.writeLock().lock(); - exchangerMap.remove(correlationId); - } finally { - lock.writeLock().unlock(); - } - } catch (InterruptedException e) { - log.debug("Exchanger was interrupted while waiting on response", e); - exchange.setException(e); - } catch (TimeoutException e) { - log.debug("Exchanger timed out while waiting on response", e); - exchange.setException(e); - } + try { + responseObject = messageExchanger.exchange(null, getResponseTimeOut(), TimeUnit.MILLISECONDS); - if (exchange.getException() == null) { - if (responseObject instanceof Throwable) { - exchange.setException((Throwable)responseObject); - } else if (responseObject instanceof Message) { - Message response = (Message)responseObject; - SjmsExchangeMessageHelper.populateExchange(response, exchange, true); - } else { - exchange.setException(new CamelException("Unknown response type: " + responseObject)); - } - } + try { + lock.writeLock().lock(); + exchangerMap.remove(correlationId); + } finally { + lock.writeLock().unlock(); } + } catch (InterruptedException e) { + log.debug("Exchanger was interrupted while waiting on response", e); + exchange.setException(e); + } catch (TimeoutException e) { + log.debug("Exchanger timed out while waiting on response", e); + exchange.setException(e); + } - callback.done(isSynchronous()); + if (exchange.getException() == null) { + if (responseObject instanceof Throwable) { + exchange.setException((Throwable)responseObject); + } else if (responseObject instanceof Message) { + Message response = (Message)responseObject; + SjmsExchangeMessageHelper.populateExchange(response, exchange, true); + } else { + exchange.setException(new CamelException("Unknown response type: " + responseObject)); + } } + + callback.done(isSynchronous()); } public void setConsumers(MessageConsumerPool consumers) {