fix Camel-7949 JmsMessageHelper to support automatic conversion from ByteBuffer to BytesMessage
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1ff65c5d Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1ff65c5d Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1ff65c5d Branch: refs/heads/master Commit: 1ff65c5d18c82fbe26a1cf24977c2f30df8f9754 Parents: 36e3c9d Author: ancosen <anco...@gmail.com> Authored: Sat Nov 8 20:30:27 2014 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sun Nov 9 08:16:12 2014 +0100 ---------------------------------------------------------------------- .../component/sjms/SjmsExchangeMessageHelper.java | 15 ++++++++------- .../sjms/consumer/InOutMessageHandler.java | 2 +- .../camel/component/sjms/jms/JmsMessageHelper.java | 17 +++++++++-------- .../component/sjms/producer/InOnlyProducer.java | 6 +++--- .../component/sjms/producer/InOutProducer.java | 2 +- .../sjms/consumer/AsyncConsumerInOutTest.java | 8 ++++++-- 6 files changed, 28 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/1ff65c5d/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsExchangeMessageHelper.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsExchangeMessageHelper.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsExchangeMessageHelper.java index 656452a..f58c570 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsExchangeMessageHelper.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsExchangeMessageHelper.java @@ -16,6 +16,11 @@ */ package org.apache.camel.component.sjms; +import static org.apache.camel.component.sjms.SjmsConstants.JMS_MESSAGE_TYPE; +import static org.apache.camel.component.sjms.SjmsConstants.QUEUE_PREFIX; +import static org.apache.camel.component.sjms.SjmsConstants.TOPIC_PREFIX; +import static org.apache.camel.util.ObjectHelper.removeStartingCharacters; + import java.io.ByteArrayOutputStream; import java.util.Enumeration; import java.util.HashMap; @@ -37,6 +42,7 @@ import javax.jms.TextMessage; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.RuntimeCamelException; +import org.apache.camel.TypeConverter; import org.apache.camel.component.sjms.jms.DefaultJmsKeyFormatStrategy; import org.apache.camel.component.sjms.jms.IllegalHeaderException; import org.apache.camel.component.sjms.jms.JmsConstants; @@ -50,11 +56,6 @@ import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.camel.component.sjms.SjmsConstants.JMS_MESSAGE_TYPE; -import static org.apache.camel.component.sjms.SjmsConstants.QUEUE_PREFIX; -import static org.apache.camel.component.sjms.SjmsConstants.TOPIC_PREFIX; -import static org.apache.camel.util.ObjectHelper.removeStartingCharacters; - public final class SjmsExchangeMessageHelper { private static final Logger LOGGER = LoggerFactory.getLogger(SjmsExchangeMessageHelper.class); @@ -430,7 +431,7 @@ public final class SjmsExchangeMessageHelper { return exchange; } - public static Message createMessage(Exchange exchange, Session session, KeyFormatStrategy keyFormatStrategy) throws Exception { + public static Message createMessage(Exchange exchange, Session session, KeyFormatStrategy keyFormatStrategy, TypeConverter typeConverter) throws Exception { Message answer = null; Object body = null; Map<String, Object> bodyHeaders = null; @@ -444,7 +445,7 @@ public final class SjmsExchangeMessageHelper { bodyHeaders = new HashMap<String, Object>(exchange.getIn().getHeaders()); } - answer = JmsMessageHelper.createMessage(session, body, bodyHeaders, keyFormatStrategy); + answer = JmsMessageHelper.createMessage(session, body, bodyHeaders, keyFormatStrategy, typeConverter); return answer; } http://git-wip-us.apache.org/repos/asf/camel/blob/1ff65c5d/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOutMessageHandler.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOutMessageHandler.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOutMessageHandler.java index 663f39f..e68c278 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOutMessageHandler.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOutMessageHandler.java @@ -159,7 +159,7 @@ public class InOutMessageHandler extends AbstractMessageHandler { @Override public void done(boolean sync) { try { - Message response = SjmsExchangeMessageHelper.createMessage(exchange, getSession(), ((SjmsEndpoint) getEndpoint()).getJmsKeyFormatStrategy()); + Message response = SjmsExchangeMessageHelper.createMessage(exchange, getSession(), ((SjmsEndpoint) getEndpoint()).getJmsKeyFormatStrategy(), ((SjmsEndpoint) getEndpoint()).getCamelContext().getTypeConverter()); response.setJMSCorrelationID(exchange.getIn().getHeader("JMSCorrelationID", String.class)); localProducer.send(response); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/camel/blob/1ff65c5d/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java index 3c83585..d57009a 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java @@ -23,6 +23,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.Set; + import javax.jms.BytesMessage; import javax.jms.DeliveryMode; import javax.jms.Destination; @@ -34,6 +35,7 @@ import javax.jms.Session; import javax.jms.StreamMessage; import javax.jms.TextMessage; +import org.apache.camel.TypeConverter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -88,16 +90,16 @@ public final class JmsMessageHelper { private JmsMessageHelper() { } - @SuppressWarnings("unchecked") - public static Message createMessage(Session session, Object payload, Map<String, Object> messageHeaders, KeyFormatStrategy keyFormatStrategy) throws Exception { + //@SuppressWarnings("unchecked") + public static Message createMessage(Session session, Object payload, Map<String, Object> messageHeaders, KeyFormatStrategy keyFormatStrategy, TypeConverter typeConverter) throws Exception { Message answer = null; - JmsMessageType messageType = JmsMessageHelper.discoverMessgeTypeFromPayload(payload); + JmsMessageType messageType = JmsMessageHelper.discoverMessageTypeFromPayload(payload); try { switch (messageType) { case Bytes: - BytesMessage bytesMessage = session.createBytesMessage(); - bytesMessage.writeBytes((byte[]) payload); + BytesMessage bytesMessage = session.createBytesMessage(); + bytesMessage = typeConverter.convertTo(BytesMessage.class, payload); answer = bytesMessage; break; case Map: @@ -122,7 +124,7 @@ public final class JmsMessageHelper { break; case Stream: ByteArrayOutputStream baos = new ByteArrayOutputStream(); - InputStream is = (InputStream) payload; + InputStream is = typeConverter.convertTo(InputStream.class, payload); int reads = is.read(); while (reads != -1) { baos.write(reads); @@ -371,7 +373,7 @@ public final class JmsMessageHelper { } } - public static JmsMessageType discoverMessgeTypeFromPayload(final Object payload) { + public static JmsMessageType discoverMessageTypeFromPayload(final Object payload) { JmsMessageType answer = null; // Default is a JMS Message since a body is not required if (payload == null) { @@ -393,7 +395,6 @@ public final class JmsMessageHelper { answer = JmsMessageType.Message; } } - return answer; } http://git-wip-us.apache.org/repos/asf/camel/blob/1ff65c5d/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java index 6ead317..d4db3cd 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java @@ -94,16 +94,16 @@ public class InOnlyProducer extends SjmsProducer { if (BatchMessage.class.isInstance(object)) { BatchMessage<?> batchMessage = (BatchMessage<?>) object; message = JmsMessageHelper.createMessage(producer.getSession(), batchMessage.getPayload(), batchMessage.getHeaders(), getSjmsEndpoint() - .getJmsKeyFormatStrategy()); + .getJmsKeyFormatStrategy(), getSjmsEndpoint().getCamelContext().getTypeConverter()); } else { - message = JmsMessageHelper.createMessage(producer.getSession(), object, exchange.getIn().getHeaders(), getSjmsEndpoint().getJmsKeyFormatStrategy()); + message = JmsMessageHelper.createMessage(producer.getSession(), object, exchange.getIn().getHeaders(), getSjmsEndpoint().getJmsKeyFormatStrategy(),getSjmsEndpoint().getCamelContext().getTypeConverter()); } messages.add(message); } } else { Object payload = exchange.getIn().getBody(); Message message = JmsMessageHelper - .createMessage(producer.getSession(), payload, exchange.getIn().getHeaders(), getSjmsEndpoint().getJmsKeyFormatStrategy()); + .createMessage(producer.getSession(), payload, exchange.getIn().getHeaders(), getSjmsEndpoint().getJmsKeyFormatStrategy(),getSjmsEndpoint().getCamelContext().getTypeConverter()); messages.add(message); } } http://git-wip-us.apache.org/repos/asf/camel/blob/1ff65c5d/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java index 630645d..be336b3 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java @@ -195,7 +195,7 @@ public class InOutProducer extends SjmsProducer { exchange.getUnitOfWork().addSynchronization(new SessionTransactionSynchronization(producer.getSession(), getCommitStrategy())); } - Message request = SjmsExchangeMessageHelper.createMessage(exchange, producer.getSession(), getSjmsEndpoint().getJmsKeyFormatStrategy()); + Message request = SjmsExchangeMessageHelper.createMessage(exchange, producer.getSession(), getSjmsEndpoint().getJmsKeyFormatStrategy(), getSjmsEndpoint().getCamelContext().getTypeConverter()); // TODO just set the correlation id don't get it from the // message http://git-wip-us.apache.org/repos/asf/camel/blob/1ff65c5d/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/AsyncConsumerInOutTest.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/AsyncConsumerInOutTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/AsyncConsumerInOutTest.java index 940c8f2..b5e7e39 100644 --- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/AsyncConsumerInOutTest.java +++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/AsyncConsumerInOutTest.java @@ -16,13 +16,17 @@ */ package org.apache.camel.component.sjms.consumer; +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.util.Arrays; +import java.util.HashMap; + import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.camel.CamelContext; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.sjms.SjmsComponent; import org.apache.camel.component.sjms.support.MyAsyncComponent; import org.apache.camel.test.junit4.CamelTestSupport; - import org.junit.Test; /** @@ -43,7 +47,7 @@ public class AsyncConsumerInOutTest extends CamelTestSupport { assertMockEndpointsSatisfied(); } - + protected CamelContext createCamelContext() throws Exception { CamelContext camelContext = super.createCamelContext();