CAMEL-7949: Harden sjsm component. Thanks to Aaron Whiteside for the patch.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1dc631ee
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1dc631ee
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1dc631ee

Branch: refs/heads/master
Commit: 1dc631ee9860da773371c16d2fa6982ae92b13d7
Parents: 3e1951a
Author: Claus Ibsen <davscl...@apache.org>
Authored: Sat Nov 29 10:31:16 2014 +0100
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Sat Nov 29 10:31:16 2014 +0100

----------------------------------------------------------------------
 .../camel/component/sjms/SjmsConstants.java     |  13 +-
 .../camel/component/sjms/SjmsEndpoint.java      |   2 +-
 .../sjms/SjmsExchangeMessageHelper.java         | 467 -------------------
 .../sjms/consumer/AbstractMessageHandler.java   |  12 +-
 .../sjms/consumer/InOnlyMessageHandler.java     |   8 +-
 .../sjms/consumer/InOutMessageHandler.java      |  27 +-
 .../camel/component/sjms/jms/JmsConstants.java  |  62 ++-
 .../sjms/jms/JmsMessageHeaderType.java          |  36 --
 .../component/sjms/jms/JmsMessageHelper.java    | 321 +++++++++----
 .../component/sjms/producer/InOnlyProducer.java |  12 +-
 .../component/sjms/producer/InOutProducer.java  |  59 +--
 .../JMSMessageHelperTypeConversionTest.java     |  67 ++-
 12 files changed, 384 insertions(+), 702 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/1dc631ee/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConstants.java
----------------------------------------------------------------------
diff --git 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConstants.java
 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConstants.java
index 035c93a..b39e51e 100644
--- 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConstants.java
+++ 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConstants.java
@@ -16,17 +16,8 @@
  */
 package org.apache.camel.component.sjms;
 
-public final class SjmsConstants {
-    public static final String QUEUE_PREFIX = "queue:";
-    public static final String TOPIC_PREFIX = "topic:";
-    public static final String TEMP_QUEUE_PREFIX = "temp:queue:";
-    public static final String TEMP_TOPIC_PREFIX = "temp:topic:";
+public interface SjmsConstants {
 
     public static final String JMS_MESSAGE_TYPE = "JmsMessageType";
-    public static final String ORIGINAL_MESSAGE = "SjmsOriginalMessage";
 
-    private SjmsConstants() {
-        // Helper class
-    }
-
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/1dc631ee/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
index 6284c13..6841d77 100644
--- 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
+++ 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
@@ -121,7 +121,7 @@ public class SjmsEndpoint extends DefaultEndpoint 
implements MultipleConsumersSu
 
     @Override
     public Producer createProducer() throws Exception {
-        SjmsProducer producer = null;
+        SjmsProducer producer;
         if (getExchangePattern().equals(ExchangePattern.InOnly)) {
             producer = new InOnlyProducer(this);
         } else {

http://git-wip-us.apache.org/repos/asf/camel/blob/1dc631ee/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsExchangeMessageHelper.java
----------------------------------------------------------------------
diff --git 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsExchangeMessageHelper.java
 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsExchangeMessageHelper.java
index 434a3c2..e69de29 100644
--- 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsExchangeMessageHelper.java
+++ 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsExchangeMessageHelper.java
@@ -1,467 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.sjms;
-
-import java.io.ByteArrayOutputStream;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-import javax.jms.BytesMessage;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MapMessage;
-import javax.jms.Message;
-import javax.jms.ObjectMessage;
-import javax.jms.Session;
-import javax.jms.StreamMessage;
-import javax.jms.TextMessage;
-
-import org.apache.camel.Endpoint;
-import org.apache.camel.Exchange;
-import org.apache.camel.RuntimeCamelException;
-import org.apache.camel.TypeConverter;
-import org.apache.camel.component.sjms.jms.DefaultJmsKeyFormatStrategy;
-import org.apache.camel.component.sjms.jms.IllegalHeaderException;
-import org.apache.camel.component.sjms.jms.JmsConstants;
-import org.apache.camel.component.sjms.jms.JmsMessageHeaderType;
-import org.apache.camel.component.sjms.jms.JmsMessageHelper;
-import org.apache.camel.component.sjms.jms.JmsMessageType;
-import org.apache.camel.component.sjms.jms.KeyFormatStrategy;
-import org.apache.camel.impl.DefaultMessage;
-import org.apache.camel.util.ExchangeHelper;
-import org.apache.camel.util.ObjectHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.camel.component.sjms.SjmsConstants.JMS_MESSAGE_TYPE;
-import static org.apache.camel.component.sjms.SjmsConstants.QUEUE_PREFIX;
-import static org.apache.camel.component.sjms.SjmsConstants.TOPIC_PREFIX;
-import static org.apache.camel.util.ObjectHelper.removeStartingCharacters;
-
-public final class SjmsExchangeMessageHelper {
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(SjmsExchangeMessageHelper.class);
-
-    private SjmsExchangeMessageHelper() {
-    }
-
-    public static Exchange createExchange(Message message, Endpoint endpoint) {
-        Exchange exchange = endpoint.createExchange();
-        return populateExchange(message, exchange, false, 
((SjmsEndpoint)endpoint).getJmsKeyFormatStrategy());
-    }
-    
-    @Deprecated 
-    /**
-     * Please use the one which has the parameter of keyFormatStrategy 
-     */
-    public static Exchange populateExchange(Message message, Exchange 
exchange, boolean out) {
-        return populateExchange(message, exchange, out, new 
DefaultJmsKeyFormatStrategy()); 
-    }
-
-    @SuppressWarnings("unchecked")
-    public static Exchange populateExchange(Message message, Exchange 
exchange, boolean out, KeyFormatStrategy keyFormatStrategy) {
-        try {
-            SjmsExchangeMessageHelper.setJmsMessageHeaders(message, exchange, 
out, keyFormatStrategy);
-            if (message != null) {
-                // convert to JMS Message of the given type
-
-                DefaultMessage bodyMessage = null;
-                if (out) {
-                    bodyMessage = (DefaultMessage) exchange.getOut();
-                } else {
-                    bodyMessage = (DefaultMessage) exchange.getIn();
-                }
-                switch (JmsMessageHelper.discoverJmsMessageType(message)) {
-                case Bytes:
-                    BytesMessage bytesMessage = (BytesMessage) message;
-                    if (bytesMessage.getBodyLength() > Integer.MAX_VALUE) {
-                        LOGGER.warn("Length of BytesMessage is too long: {}", 
bytesMessage.getBodyLength());
-                        return null;
-                    }
-                    byte[] result = new byte[(int) 
bytesMessage.getBodyLength()];
-                    bytesMessage.readBytes(result);
-                    bodyMessage.setHeader(JMS_MESSAGE_TYPE, 
JmsMessageType.Bytes);
-                    bodyMessage.setBody(result);
-                    break;
-                case Map:
-                    Map<String, Object> body = new HashMap<String, Object>();
-                    MapMessage mapMessage = (MapMessage) message;
-                    Enumeration<String> names = mapMessage.getMapNames();
-                    while (names.hasMoreElements()) {
-                        String key = names.nextElement();
-                        Object value = mapMessage.getObject(key);
-                        body.put(key, value);
-                    }
-                    bodyMessage.setHeader(JMS_MESSAGE_TYPE, 
JmsMessageType.Map);
-                    bodyMessage.setBody(body);
-                    break;
-                case Object:
-                    ObjectMessage objMsg = (ObjectMessage) message;
-                    bodyMessage.setHeader(JMS_MESSAGE_TYPE, 
JmsMessageType.Object);
-                    bodyMessage.setBody(objMsg.getObject());
-                    break;
-                case Text:
-                    TextMessage textMsg = (TextMessage) message;
-                    bodyMessage.setHeader(JMS_MESSAGE_TYPE, 
JmsMessageType.Text);
-                    bodyMessage.setBody(textMsg.getText());
-                    break;
-                case Stream:
-                    StreamMessage streamMessage = (StreamMessage) message;
-                    ByteArrayOutputStream baos = new ByteArrayOutputStream();
-                    int next = streamMessage.readByte();
-                    while (next > -1) {
-                        baos.write(next);
-                        next = streamMessage.readByte();
-                    }
-                    baos.flush();
-                    bodyMessage.setHeader(JMS_MESSAGE_TYPE, 
JmsMessageType.Bytes);
-                    bodyMessage.setBody(baos.toByteArray());
-                    break;
-                case Message:
-                default:
-                    // Do nothing. Only set the headers for an empty message
-                    bodyMessage.setBody(message);
-                    break;
-                }
-            }
-        } catch (Exception e) {
-            exchange.setException(e);
-        }
-        return exchange;
-    }
-
-    /**
-     * Removes the property from the JMS message.
-     *
-     * @param jmsMessage the JMS message
-     * @param name       name of the property to remove
-     * @return the old value of the property or <tt>null</tt> if not exists
-     * @throws JMSException can be thrown
-     */
-    public static Object removeJmsProperty(Message jmsMessage, String name) 
throws JMSException {
-        // check if the property exists
-        if (!jmsMessage.propertyExists(name)) {
-            return null;
-        }
-
-        Object answer = null;
-
-        // store the properties we want to keep in a temporary map
-        // as the JMS API is a bit strict as we are not allowed to
-        // clear a single property, but must clear them all and redo
-        // the properties
-        Map<String, Object> map = new LinkedHashMap<String, Object>();
-        Enumeration<?> en = jmsMessage.getPropertyNames();
-        while (en.hasMoreElements()) {
-            String key = (String) en.nextElement();
-            if (name.equals(key)) {
-                answer = key;
-            } else {
-                map.put(key, jmsMessage.getObjectProperty(key));
-            }
-        }
-
-        // redo the properties to keep
-        jmsMessage.clearProperties();
-        for (Entry<String, Object> entry : map.entrySet()) {
-            jmsMessage.setObjectProperty(entry.getKey(), entry.getValue());
-        }
-
-        return answer;
-    }
-
-    /**
-     * Tests whether a given property with the name exists
-     *
-     * @param jmsMessage the JMS message
-     * @param name       name of the property to test if exists
-     * @return <tt>true</tt> if the property exists, <tt>false</tt> if not.
-     * @throws JMSException can be thrown
-     */
-    public static boolean hasProperty(Message jmsMessage, String name) throws 
JMSException {
-        Enumeration<?> en = jmsMessage.getPropertyNames();
-        while (en.hasMoreElements()) {
-            String key = (String) en.nextElement();
-            if (name.equals(key)) {
-                return true;
-            }
-        }
-        return false;
-    }
-
-    /**
-     * Sets the property on the given JMS message.
-     *
-     * @param jmsMessage the JMS message
-     * @param name       name of the property to set
-     * @param value      the value
-     * @throws JMSException can be thrown
-     */
-    public static void setProperty(Message jmsMessage, String name, Object 
value) throws JMSException {
-        if (value == null) {
-            return;
-        }
-        if (value instanceof Byte) {
-            jmsMessage.setByteProperty(name, (Byte) value);
-        } else if (value instanceof Boolean) {
-            jmsMessage.setBooleanProperty(name, (Boolean) value);
-        } else if (value instanceof Double) {
-            jmsMessage.setDoubleProperty(name, (Double) value);
-        } else if (value instanceof Float) {
-            jmsMessage.setFloatProperty(name, (Float) value);
-        } else if (value instanceof Integer) {
-            jmsMessage.setIntProperty(name, (Integer) value);
-        } else if (value instanceof Long) {
-            jmsMessage.setLongProperty(name, (Long) value);
-        } else if (value instanceof Short) {
-            jmsMessage.setShortProperty(name, (Short) value);
-        } else if (value instanceof String) {
-            jmsMessage.setStringProperty(name, (String) value);
-        } else {
-            // fallback to Object
-            jmsMessage.setObjectProperty(name, value);
-        }
-    }
-
-    /**
-     * Sets the correlation id on the JMS message.
-     * <p/>
-     * Will ignore exception thrown
-     *
-     * @param message       the JMS message
-     * @param correlationId the correlation id
-     */
-    public static void setCorrelationId(Message message, String correlationId) 
{
-        try {
-            message.setJMSCorrelationID(correlationId);
-        } catch (JMSException e) {
-            if (LOGGER.isDebugEnabled()) {
-                LOGGER.debug("Error setting the correlationId: {}", 
correlationId);
-            }
-        }
-    }
-
-    /**
-     * Normalizes the destination name, by removing any leading queue or topic
-     * prefixes.
-     *
-     * @param destination the destination
-     * @return the normalized destination
-     */
-    public static String normalizeDestinationName(String destination) {
-        if (ObjectHelper.isEmpty(destination)) {
-            return destination;
-        }
-        if (destination.startsWith(QUEUE_PREFIX)) {
-            return 
removeStartingCharacters(destination.substring(QUEUE_PREFIX.length()), '/');
-        } else if (destination.startsWith(TOPIC_PREFIX)) {
-            return 
removeStartingCharacters(destination.substring(TOPIC_PREFIX.length()), '/');
-        } else {
-            return destination;
-        }
-    }
-
-    /**
-     * Sets the JMSReplyTo on the message.
-     *
-     * @param message the message
-     * @param replyTo the reply to destination
-     */
-    public static void setJMSReplyTo(Message message, Destination replyTo) {
-        try {
-            message.setJMSReplyTo(replyTo);
-        } catch (Exception e) {
-            LOGGER.debug("Error setting the correlationId: {}", 
replyTo.toString());
-        }
-    }
-
-    /**
-     * Gets the JMSReplyTo from the message.
-     *
-     * @param message the message
-     * @return the reply to, can be <tt>null</tt>
-     */
-    public static Destination getJMSReplyTo(Message message) {
-        try {
-            return message.getJMSReplyTo();
-        } catch (Exception e) {
-            // ignore due OracleAQ does not support accessing JMSReplyTo
-        }
-
-        return null;
-    }
-
-    /**
-     * Gets the JMSType from the message.
-     *
-     * @param message the message
-     * @return the type, can be <tt>null</tt>
-     */
-    public static String getJMSType(Message message) {
-        try {
-            return message.getJMSType();
-        } catch (Exception e) {
-            // ignore due OracleAQ does not support accessing JMSType
-        }
-
-        return null;
-    }
-
-    /**
-     * Gets the JMSRedelivered from the message.
-     *
-     * @param message the message
-     * @return <tt>true</tt> if redelivered, <tt>false</tt> if not,
-     * <tt>null</tt> if not able to determine
-     */
-    public static Boolean getJMSRedelivered(Message message) {
-        try {
-            return message.getJMSRedelivered();
-        } catch (Exception e) {
-            // ignore if JMS broker do not support this
-        }
-
-        return null;
-    }
-
-    /**
-     * Sets the JMSDeliveryMode on the message.
-     *
-     * @param exchange     the exchange
-     * @param message      the message
-     * @param deliveryMode the delivery mode, either as a String or integer
-     * @throws javax.jms.JMSException is thrown if error setting the delivery
-     *                                mode
-     */
-    public static void setJMSDeliveryMode(Exchange exchange, Message message, 
Object deliveryMode) throws JMSException {
-        Integer mode = null;
-
-        if (deliveryMode instanceof String) {
-            String s = (String) deliveryMode;
-            if ("PERSISTENT".equalsIgnoreCase(s)) {
-                mode = DeliveryMode.PERSISTENT;
-            } else if ("NON_PERSISTENT".equalsIgnoreCase(s)) {
-                mode = DeliveryMode.NON_PERSISTENT;
-            } else {
-                // it may be a number in the String so try that
-                Integer value = ExchangeHelper.convertToType(exchange, 
Integer.class, deliveryMode);
-                if (value != null) {
-                    mode = value;
-                } else {
-                    throw new IllegalArgumentException("Unknown delivery mode 
with value: " + deliveryMode);
-                }
-            }
-        } else {
-            // fallback and try to convert to a number
-            Integer value = ExchangeHelper.convertToType(exchange, 
Integer.class, deliveryMode);
-            if (value != null) {
-                mode = value;
-            }
-        }
-
-        if (mode != null) {
-            message.setJMSDeliveryMode(mode);
-            message.setIntProperty(JmsConstants.JMS_DELIVERY_MODE, mode);
-        }
-    }
-    
-    @Deprecated
-    /**
-     * Please use the one which has the parameter of keyFormatStrategy
-     */
-    public static Exchange setJmsMessageHeaders(final Message jmsMessage, 
final Exchange exchange, boolean out) throws JMSException {
-        return setJmsMessageHeaders(jmsMessage, exchange, out, new 
DefaultJmsKeyFormatStrategy());
-    }
-
-    @SuppressWarnings("unchecked")
-    public static Exchange setJmsMessageHeaders(final Message jmsMessage, 
final Exchange exchange, boolean out, KeyFormatStrategy keyFormatStrategy) 
throws JMSException {
-        Map<String, Object> headers = new HashMap<String, Object>();
-        if (jmsMessage != null) {
-            // lets populate the standard JMS message headers
-            try {
-                headers.put(JmsMessageHeaderType.JMSCorrelationID.toString(), 
jmsMessage.getJMSCorrelationID());
-                headers.put(JmsMessageHeaderType.JMSDeliveryMode.toString(), 
jmsMessage.getJMSDeliveryMode());
-                headers.put(JmsMessageHeaderType.JMSDestination.toString(), 
jmsMessage.getJMSDestination());
-                headers.put(JmsMessageHeaderType.JMSExpiration.toString(), 
jmsMessage.getJMSExpiration());
-                headers.put(JmsMessageHeaderType.JMSMessageID.toString(), 
jmsMessage.getJMSMessageID());
-                headers.put(JmsMessageHeaderType.JMSPriority.toString(), 
jmsMessage.getJMSPriority());
-                headers.put(JmsMessageHeaderType.JMSRedelivered.toString(), 
jmsMessage.getJMSRedelivered());
-                headers.put(JmsMessageHeaderType.JMSTimestamp.toString(), 
jmsMessage.getJMSTimestamp());
-                headers.put(JmsMessageHeaderType.JMSReplyTo.toString(), 
SjmsExchangeMessageHelper.getJMSReplyTo(jmsMessage));
-                headers.put(JmsMessageHeaderType.JMSType.toString(), 
SjmsExchangeMessageHelper.getJMSType(jmsMessage));
-
-                // this works around a bug in the ActiveMQ property handling
-                headers.put(JmsMessageHeaderType.JMSXGroupID.toString(), 
jmsMessage.getStringProperty(JmsMessageHeaderType.JMSXGroupID.toString()));
-            } catch (JMSException e) {
-                throw new RuntimeCamelException(e);
-            }
-
-            for (Enumeration<String> enumeration = 
jmsMessage.getPropertyNames(); enumeration.hasMoreElements();) {
-                String key = enumeration.nextElement();
-                if (hasIllegalHeaderKey(key)) {
-                    throw new IllegalHeaderException("Header " + key + " is 
not a legal JMS header name value");
-                }
-                Object value = jmsMessage.getObjectProperty(key);
-                String decodedName = keyFormatStrategy.decodeKey(key);
-                headers.put(decodedName, value);
-            }
-        }
-        if (out) {
-            exchange.getOut().setHeaders(headers);
-        } else {
-            exchange.getIn().setHeaders(headers);
-        }
-        return exchange;
-    }
-
-    public static Message createMessage(Exchange exchange, Session session, 
KeyFormatStrategy keyFormatStrategy, TypeConverter typeConverter) throws 
Exception {
-        Message answer = null;
-        Object body = null;
-        Map<String, Object> bodyHeaders = null;
-
-
-        if (exchange.getOut().getBody() != null) {
-            body = exchange.getOut().getBody();
-            bodyHeaders = new HashMap<String, 
Object>(exchange.getOut().getHeaders());
-        } else {
-            body = exchange.getIn().getBody();
-            bodyHeaders = new HashMap<String, 
Object>(exchange.getIn().getHeaders());
-        }
-
-        answer = JmsMessageHelper.createMessage(session, body, bodyHeaders, 
keyFormatStrategy, typeConverter);
-
-        return answer;
-    }
-
-    private static boolean hasIllegalHeaderKey(String key) {
-        if (key == null) {
-            return true;
-        }
-        if (key.equals("")) {
-            return true;
-        }
-        if (key.indexOf(".") > -1) {
-            return true;
-        }
-        if (key.indexOf("-") > -1) {
-            return true;
-        }
-        return false;
-    }
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/1dc631ee/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/AbstractMessageHandler.java
----------------------------------------------------------------------
diff --git 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/AbstractMessageHandler.java
 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/AbstractMessageHandler.java
index 763b2a7..c3c5e55 100644
--- 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/AbstractMessageHandler.java
+++ 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/AbstractMessageHandler.java
@@ -25,8 +25,7 @@ import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Exchange;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.component.sjms.SjmsEndpoint;
-import org.apache.camel.component.sjms.SjmsExchangeMessageHelper;
-import org.apache.camel.component.sjms.TransactionCommitStrategy;
+import org.apache.camel.component.sjms.jms.JmsMessageHelper;
 import org.apache.camel.impl.DefaultExchange;
 import org.apache.camel.spi.Synchronization;
 import org.slf4j.Logger;
@@ -50,7 +49,6 @@ public abstract class AbstractMessageHandler implements 
MessageListener {
     private boolean synchronous = true;
     private Synchronization synchronization;
     private boolean topic;
-    private TransactionCommitStrategy commitStrategy;
 
     public AbstractMessageHandler(SjmsEndpoint endpoint, ExecutorService 
executor) {
         this.endpoint = endpoint;
@@ -72,7 +70,7 @@ public abstract class AbstractMessageHandler implements 
MessageListener {
     public void onMessage(Message message) {
         RuntimeCamelException rce = null;
         try {
-            final DefaultExchange exchange = (DefaultExchange) 
SjmsExchangeMessageHelper.createExchange(message, getEndpoint());
+            final DefaultExchange exchange = (DefaultExchange) 
JmsMessageHelper.createExchange(message, getEndpoint());
 
             log.debug("Processing Exchange.id:{}", exchange.getExchangeId());
 
@@ -92,9 +90,7 @@ public abstract class AbstractMessageHandler implements 
MessageListener {
                                 handleMessage(exchange);
                             } catch (Exception e) {
                                 exchange.setException(e);
-//                                ObjectHelper.wrapRuntimeCamelException(e);
                             }
-
                         }
                     });
                 }
@@ -169,8 +165,4 @@ public abstract class AbstractMessageHandler implements 
MessageListener {
     public boolean isTopic() {
         return topic;
     }
-
-    public TransactionCommitStrategy getCommitStrategy() {
-        return commitStrategy;
-    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/1dc631ee/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOnlyMessageHandler.java
----------------------------------------------------------------------
diff --git 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOnlyMessageHandler.java
 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOnlyMessageHandler.java
index da658b7..ae27d27 100644
--- 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOnlyMessageHandler.java
+++ 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOnlyMessageHandler.java
@@ -46,16 +46,14 @@ public class InOnlyMessageHandler extends 
AbstractMessageHandler {
     }
 
     /**
-     * @param message
+     * @param exchange
      */
     @Override
     public void handleMessage(final Exchange exchange) {
         if (log.isDebugEnabled()) {
             log.debug("Handling InOnly Message: {}", 
exchange.getIn().getBody());
         }
-        if (exchange.isFailed()) {
-            return;
-        } else {
+        if (!exchange.isFailed()) {
             NoOpAsyncCallback callback = new NoOpAsyncCallback();
             if (isTransacted() || isSynchronous()) {
                 // must process synchronous if transacted or configured to
@@ -72,7 +70,7 @@ public class InOnlyMessageHandler extends 
AbstractMessageHandler {
                 }
             } else {
                 // process asynchronous using the async routing engine
-                log.debug("Aynchronous processing: Message[{}], 
Destination[{}] ", exchange.getIn().getBody(), getEndpoint().getEndpointUri());
+                log.debug("Asynchronous processing: Message[{}], 
Destination[{}] ", exchange.getIn().getBody(), getEndpoint().getEndpointUri());
 
                 getProcessor().process(exchange, callback);
             }

http://git-wip-us.apache.org/repos/asf/camel/blob/1dc631ee/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOutMessageHandler.java
----------------------------------------------------------------------
diff --git 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOutMessageHandler.java
 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOutMessageHandler.java
index 9be8d2f..25a0f1d 100644
--- 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOutMessageHandler.java
+++ 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOutMessageHandler.java
@@ -31,10 +31,9 @@ import javax.jms.Topic;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.component.sjms.SjmsEndpoint;
-import org.apache.camel.component.sjms.SjmsExchangeMessageHelper;
+import org.apache.camel.component.sjms.jms.JmsConstants;
 import org.apache.camel.component.sjms.jms.JmsMessageHelper;
 import org.apache.camel.spi.Synchronization;
-import org.apache.camel.util.ObjectHelper;
 
 /**
  * cache manager to store and purge unused cashed producers or we will have a
@@ -88,7 +87,7 @@ public class InOutMessageHandler extends 
AbstractMessageHandler {
                 }
             }
 
-            MessageHanderAsyncCallback callback = new 
MessageHanderAsyncCallback(exchange, messageProducer);
+            MessageHandlerAsyncCallback callback = new 
MessageHandlerAsyncCallback(exchange, messageProducer);
             if (exchange.isFailed()) {
                 return;
             } else {
@@ -105,7 +104,7 @@ public class InOutMessageHandler extends 
AbstractMessageHandler {
                     }
                 } else {
                     // process asynchronous using the async routing engine
-                    log.debug("Aynchronous processing: Message[{}], 
Destination[{}] ", exchange.getIn().getBody(), getEndpoint().getEndpointUri());
+                    log.debug("Asynchronous processing: Message[{}], 
Destination[{}] ", exchange.getIn().getBody(), getEndpoint().getEndpointUri());
                     getProcessor().process(exchange, callback);
                 }
             }
@@ -120,12 +119,11 @@ public class InOutMessageHandler extends 
AbstractMessageHandler {
 
     @Override
     public void close() {
-        for (String key : producerCache.keySet()) {
-            MessageProducer mp = producerCache.get(key);
+        for (final Map.Entry<String,MessageProducer> entry : 
producerCache.entrySet()) {
             try {
-                mp.close();
+                entry.getValue().close();
             } catch (JMSException e) {
-                ObjectHelper.wrapRuntimeCamelException(e);
+                log.debug("Cached MessageProducer with key:{} threw an 
unexpected exception", entry.getKey(), e);
             }
         }
         producerCache.clear();
@@ -146,12 +144,12 @@ public class InOutMessageHandler extends 
AbstractMessageHandler {
         return answer;
     }
 
-    protected class MessageHanderAsyncCallback implements AsyncCallback {
+    protected class MessageHandlerAsyncCallback implements AsyncCallback {
 
-        private Exchange exchange;
-        private MessageProducer localProducer;
+        private final Exchange exchange;
+        private final MessageProducer localProducer;
 
-        public MessageHanderAsyncCallback(Exchange exchange, MessageProducer 
localProducer) {
+        public MessageHandlerAsyncCallback(Exchange exchange, MessageProducer 
localProducer) {
             this.exchange = exchange;
             this.localProducer = localProducer;
         }
@@ -159,9 +157,8 @@ public class InOutMessageHandler extends 
AbstractMessageHandler {
         @Override
         public void done(boolean sync) {
             try {
-                Message response = 
SjmsExchangeMessageHelper.createMessage(exchange, getSession(),
-                        getEndpoint().getJmsKeyFormatStrategy(), 
getEndpoint().getCamelContext().getTypeConverter());
-                
response.setJMSCorrelationID(exchange.getIn().getHeader("JMSCorrelationID", 
String.class));
+                Message response = JmsMessageHelper.createMessage(exchange, 
getSession(), getEndpoint());
+                
response.setJMSCorrelationID(exchange.getIn().getHeader(JmsConstants.JMS_CORRELATION_ID,
 String.class));
                 localProducer.send(response);
             } catch (Exception e) {
                 exchange.setException(e);

http://git-wip-us.apache.org/repos/asf/camel/blob/1dc631ee/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsConstants.java
----------------------------------------------------------------------
diff --git 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsConstants.java
 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsConstants.java
index d92d2c8..725cc20 100644
--- 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsConstants.java
+++ 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsConstants.java
@@ -19,15 +19,59 @@ package org.apache.camel.component.sjms.jms;
 /**
  * JMS constants
  */
-public final class JmsConstants {
+public interface JmsConstants {
 
-    public static final String JMS_DESTINATION = "CamelJmsDestination";
-    public static final String JMS_DESTINATION_NAME = 
"CamelJmsDestinationName";
-    public static final String JMS_MESSAGE_TYPE = "CamelJmsMessageType";
-    public static final String JMS_DELIVERY_MODE = "CamelJmsDeliveryMode";
+    /**
+     * Set by the publishing Client
+     */
+    public static final String JMS_CORRELATION_ID = "JMSCorrelationID";
+    /**
+     * Set on the send or publish event
+     */
+    public static final String JMS_DELIVERY_MODE = "JMSDeliveryMode";
+    /**
+     * Set on the send or publish event
+     */
+    public static final String JMS_DESTINATION = "JMSDestination";
+    /**
+     * Set on the send or publish event
+     */
+    public static final String JMS_EXPIRATION = "JMSExpiration";
+    /**
+     * Set on the send or publish event
+     */
+    public static final String JMS_MESSAGE_ID = "JMSMessageID";
+    /**
+     * Set on the send or publish event
+     */
+    public static final String JMS_PRIORITY = "JMSPriority";
+    /**
+     * A redelivery flag set by the JMS provider
+     */
+    public static final String JMS_REDELIVERED = "JMSRedelivered";
+    /**
+     * The JMS Reply To {@link javax.jms.Destination} set by the publishing 
Client
+     */
+    public static final String JMS_REPLY_TO = "JMSReplyTo";
+    /**
+     * Set on the send or publish event
+     */
+    public static final String JMS_TIMESTAMP = "JMSTimestamp";
+    /**
+     * Set by the publishing Client
+     */
+    public static final String JMS_TYPE = "JMSType";
 
-    private JmsConstants() {
-        // utility class
-    }
+    /**
+     * Custom headers
+     */
+    public static final String JMSX_GROUP_ID = "JMSXGroupID";
 
-}
+
+    /**
+     * String representation of JMS delivery modes.
+     */
+    public static final String JMS_DELIVERY_MODE_PERSISTENT = "PERSISTENT";
+    public static final String JMS_DELIVERY_MODE_NON_PERSISTENT = 
"NON_PERSISTENT";
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/1dc631ee/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHeaderType.java
----------------------------------------------------------------------
diff --git 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHeaderType.java
 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHeaderType.java
index 101423b..e69de29 100644
--- 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHeaderType.java
+++ 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHeaderType.java
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.sjms.jms;
-
-public enum JmsMessageHeaderType {
-    JMSDestination,
-    JMSDeliveryMode,
-    JMSExpiration,
-    JMSPriority,
-    JMSMessageID,
-    JMSTimestamp,
-    JMSCorrelationID,
-    JMSReplyTo,
-    JMSType,
-    JMSRedelivered,
-
-    /*
-     * Add known custom headers
-     */
-    JMSXGroupID
-
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/1dc631ee/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java
----------------------------------------------------------------------
diff --git 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java
 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java
index 79c57be..5ca09b4 100644
--- 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java
+++ 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java
@@ -16,15 +16,17 @@
  */
 package org.apache.camel.component.sjms.jms;
 
-import java.io.ByteArrayOutputStream;
 import java.io.File;
-import java.io.FileInputStream;
 import java.io.InputStream;
 import java.io.Reader;
 import java.io.Serializable;
 import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Enumeration;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import javax.jms.BytesMessage;
@@ -38,63 +40,123 @@ import javax.jms.Session;
 import javax.jms.StreamMessage;
 import javax.jms.TextMessage;
 
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.TypeConverter;
+import org.apache.camel.component.sjms.SjmsConstants;
+import org.apache.camel.component.sjms.SjmsEndpoint;
+import org.apache.camel.impl.DefaultMessage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * Utility class for {@link javax.jms.Message}.
  */
-public final class JmsMessageHelper {
-
-    /**
-     * Set by the publishing Client
-     */
-    public static final String JMS_CORRELATION_ID = "JMSCorrelationID";
-    /**
-     * Set on the send or publish event
-     */
-    public static final String JMS_DELIVERY_MODE = "JMSDeliveryMode";
-    /**
-     * Set on the send or publish event
-     */
-    public static final String JMS_DESTINATION = "JMSDestination";
-    /**
-     * Set on the send or publish event
-     */
-    public static final String JMS_EXPIRATION = "JMSExpiration";
-    /**
-     * Set on the send or publish event
-     */
-    public static final String JMS_MESSAGE_ID = "JMSMessageID";
-    /**
-     * Set on the send or publish event
-     */
-    public static final String JMS_PRIORITY = "JMSPriority";
-    /**
-     * A redelivery flag set by the JMS provider
-     */
-    public static final String JMS_REDELIVERED = "JMSRedelivered";
-    /**
-     * The JMS Reply To {@link Destination} set by the publishing Client
-     */
-    public static final String JMS_REPLY_TO = "JMSReplyTo";
-    /**
-     * Set on the send or publish event
-     */
-    public static final String JMS_TIMESTAMP = "JMSTimestamp";
-    /**
-     * Set by the publishing Client
-     */
-    public static final String JMS_TYPE = "JMSType";
+public final class JmsMessageHelper implements JmsConstants {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(JmsMessageHelper.class);
 
     private JmsMessageHelper() {
     }
 
-    //@SuppressWarnings("unchecked")
-    public static Message createMessage(Session session, Object payload, 
Map<String, Object> messageHeaders, KeyFormatStrategy keyFormatStrategy, 
TypeConverter typeConverter) throws Exception {
+    public static Exchange createExchange(Message message, Endpoint endpoint) {
+        Exchange exchange = endpoint.createExchange();
+        return populateExchange(message, exchange, false, 
((SjmsEndpoint)endpoint).getJmsKeyFormatStrategy());
+    }
+
+    @SuppressWarnings("unchecked")
+    public static Exchange populateExchange(Message message, Exchange 
exchange, boolean out, KeyFormatStrategy keyFormatStrategy) {
+        try {
+            setJmsMessageHeaders(message, exchange, out, keyFormatStrategy);
+            if (message != null) {
+                // convert to JMS Message of the given type
+
+                DefaultMessage bodyMessage;
+                if (out) {
+                    bodyMessage = (DefaultMessage) exchange.getOut();
+                } else {
+                    bodyMessage = (DefaultMessage) exchange.getIn();
+                }
+                switch (JmsMessageHelper.discoverJmsMessageType(message)) {
+                    case Bytes:
+                        BytesMessage bytesMessage = (BytesMessage) message;
+                        if (bytesMessage.getBodyLength() > Integer.MAX_VALUE) {
+                            LOGGER.warn("Length of BytesMessage is too long: 
{}", bytesMessage.getBodyLength());
+                            return null;
+                        }
+                        byte[] result = new byte[(int) 
bytesMessage.getBodyLength()];
+                        bytesMessage.readBytes(result);
+                        bodyMessage.setHeader(SjmsConstants.JMS_MESSAGE_TYPE, 
JmsMessageType.Bytes);
+                        bodyMessage.setBody(result);
+                        break;
+                    case Map:
+                        Map<String, Object> body = new HashMap<>();
+                        MapMessage mapMessage = (MapMessage) message;
+                        Enumeration<String> names = mapMessage.getMapNames();
+                        while (names.hasMoreElements()) {
+                            String key = names.nextElement();
+                            Object value = mapMessage.getObject(key);
+                            body.put(key, value);
+                        }
+                        bodyMessage.setHeader(SjmsConstants.JMS_MESSAGE_TYPE, 
JmsMessageType.Map);
+                        bodyMessage.setBody(body);
+                        break;
+                    case Object:
+                        ObjectMessage objMsg = (ObjectMessage) message;
+                        bodyMessage.setHeader(SjmsConstants.JMS_MESSAGE_TYPE, 
JmsMessageType.Object);
+                        bodyMessage.setBody(objMsg.getObject());
+                        break;
+                    case Text:
+                        TextMessage textMsg = (TextMessage) message;
+                        bodyMessage.setHeader(SjmsConstants.JMS_MESSAGE_TYPE, 
JmsMessageType.Text);
+                        bodyMessage.setBody(textMsg.getText());
+                        break;
+                    case Stream:
+                        StreamMessage streamMessage = (StreamMessage) message;
+                        List list = new ArrayList<>();
+                        Object obj;
+                        while ((obj = streamMessage.readObject()) != null) {
+                            list.add(obj);
+                        }
+                        bodyMessage.setHeader(SjmsConstants.JMS_MESSAGE_TYPE, 
JmsMessageType.Stream);
+                        bodyMessage.setBody(list);
+                        break;
+                    case Message:
+                    default:
+                        // Do nothing. Only set the headers for an empty 
message
+                        bodyMessage.setBody(message);
+                        break;
+                }
+            }
+        } catch (Exception e) {
+            exchange.setException(e);
+        }
+        return exchange;
+    }
+
+    public static Message createMessage(Exchange exchange, Session session, 
SjmsEndpoint endpoint) throws Exception {
+        Message answer;
+        Object body;
+        Map<String, Object> bodyHeaders;
+
+        if (exchange.getOut().getBody() != null) {
+            body = exchange.getOut().getBody();
+            bodyHeaders = new HashMap<>(exchange.getOut().getHeaders());
+        } else {
+            body = exchange.getIn().getBody();
+            bodyHeaders = new HashMap<>(exchange.getIn().getHeaders());
+        }
+
+        answer = createMessage(session, body, bodyHeaders, endpoint);
+        return answer;
+    }
+
+    public static Message createMessage(Session session, Object payload, 
Map<String, Object> messageHeaders, SjmsEndpoint endpoint) throws Exception {
+        return createMessage(session, payload, messageHeaders, 
endpoint.getJmsKeyFormatStrategy(), 
endpoint.getCamelContext().getTypeConverter());
+    }
+
+    private static Message createMessage(Session session, Object payload, 
Map<String, Object> messageHeaders, KeyFormatStrategy keyFormatStrategy, 
TypeConverter typeConverter) throws Exception {
         Message answer = null;
         JmsMessageType messageType = 
JmsMessageHelper.discoverMessageTypeFromPayload(payload);
         try {
@@ -108,11 +170,9 @@ public final class JmsMessageHelper {
                 break;
             case Map:
                 MapMessage mapMessage = session.createMapMessage();
-                Map<String, Object> objMap = (Map<String, Object>) payload;
-                Set<String> keys = objMap.keySet();
-                for (String key : keys) {
-                    Object value = objMap.get(key);
-                    mapMessage.setObject(key, value);
+                Map objMap = (Map) payload;
+                for (final Map.Entry entry : 
(Set<Map.Entry>)objMap.entrySet()) {
+                    mapMessage.setObject(entry.getKey().toString(), 
entry.getValue());
                 }
                 answer = mapMessage;
                 break;
@@ -128,18 +188,12 @@ public final class JmsMessageHelper {
                 answer = textMessage;
                 break;
             case Stream:
-                ByteArrayOutputStream baos = new ByteArrayOutputStream();
-                InputStream is = typeConverter.convertTo(InputStream.class, 
payload);
-                int reads = is.read();
-                while (reads != -1) {
-                    baos.write(reads);
-                    reads = is.read();
+                StreamMessage streamMessage = session.createStreamMessage();
+                Collection collection = (Collection)payload;
+                for (final Object obj : collection) {
+                    streamMessage.writeObject(obj);
                 }
-                BytesMessage bytesStreamMessage = session.createBytesMessage();
-                bytesStreamMessage.writeBytes(baos.toByteArray());
-                baos.close();
-                is.close();
-                answer = bytesStreamMessage;
+                answer = streamMessage;
                 break;
             default:
                 break;
@@ -165,19 +219,11 @@ public final class JmsMessageHelper {
      *                          {@link DefaultJmsKeyFormatStrategy} will be 
used.
      * @return {@link Message}
      */
-    public static Message setJmsMessageHeaders(final Message jmsMessage, 
Map<String, Object> messageHeaders, KeyFormatStrategy keyFormatStrategy) throws 
IllegalHeaderException {
-        // Support for the null keyFormatStrategy
-        KeyFormatStrategy localKeyFormatStrategy = null;
-        if (keyFormatStrategy == null) {
-            localKeyFormatStrategy = new DefaultJmsKeyFormatStrategy();
-        } else {
-            localKeyFormatStrategy = keyFormatStrategy;
-        }
-
-        Map<String, Object> headers = new HashMap<String, 
Object>(messageHeaders);
-        Set<String> keys = headers.keySet();
-        for (String headerName : keys) {
-            Object headerValue = headers.get(headerName);
+    private static Message setJmsMessageHeaders(final Message jmsMessage, 
Map<String, Object> messageHeaders, KeyFormatStrategy keyFormatStrategy) throws 
IllegalHeaderException {
+        Map<String, Object> headers = new HashMap<>(messageHeaders);
+        for (final Map.Entry<String,Object> entry : headers.entrySet()) {
+            String headerName = entry.getKey();
+            Object headerValue = entry.getValue();
 
             if (headerName.equalsIgnoreCase(JMS_CORRELATION_ID)) {
                 if (headerValue == null) {
@@ -241,8 +287,8 @@ public final class JmsMessageHelper {
                 }
             } else {
                 LOGGER.trace("Ignoring JMS header: {} with value: {}", 
headerName, headerValue);
-                if (headerName.equalsIgnoreCase(JMS_DESTINATION) || 
headerName.equalsIgnoreCase(JMS_MESSAGE_ID) || 
headerName.equalsIgnoreCase("JMSTimestamp")
-                        || headerName.equalsIgnoreCase("JMSRedelivered")) {
+                if (headerName.equalsIgnoreCase(JMS_DESTINATION) || 
headerName.equalsIgnoreCase(JMS_MESSAGE_ID) || 
headerName.equalsIgnoreCase(JMS_TIMESTAMP)
+                        || headerName.equalsIgnoreCase(JMS_REDELIVERED)) {
                     // The following properties are set by the
                     // MessageProducer:
                     // JMSDestination
@@ -252,7 +298,7 @@ public final class JmsMessageHelper {
                     LOGGER.trace("Ignoring JMS header: {} with value: {}", 
headerName, headerValue);
                 } else {
                     if (!(headerValue instanceof JmsMessageType)) {
-                        String encodedName = 
localKeyFormatStrategy.encodeKey(headerName);
+                        String encodedName = 
keyFormatStrategy.encodeKey(headerName);
                         try {
                             JmsMessageHelper.setProperty(jmsMessage, 
encodedName, headerValue);
                         } catch (JMSException e) {
@@ -260,12 +306,68 @@ public final class JmsMessageHelper {
                         }
                     }
                 }
-                // }
             }
         }
         return jmsMessage;
     }
 
+    @SuppressWarnings("unchecked")
+    public static Exchange setJmsMessageHeaders(final Message jmsMessage, 
final Exchange exchange, boolean out, KeyFormatStrategy keyFormatStrategy) 
throws JMSException {
+        Map<String, Object> headers = new HashMap<>();
+        if (jmsMessage != null) {
+            // lets populate the standard JMS message headers
+            try {
+                headers.put(JMS_CORRELATION_ID, 
jmsMessage.getJMSCorrelationID());
+                headers.put(JMS_DELIVERY_MODE, 
jmsMessage.getJMSDeliveryMode());
+                headers.put(JMS_DESTINATION, jmsMessage.getJMSDestination());
+                headers.put(JMS_EXPIRATION, jmsMessage.getJMSExpiration());
+                headers.put(JMS_MESSAGE_ID, jmsMessage.getJMSMessageID());
+                headers.put(JMS_PRIORITY, jmsMessage.getJMSPriority());
+                headers.put(JMS_REDELIVERED, jmsMessage.getJMSRedelivered());
+                headers.put(JMS_TIMESTAMP, jmsMessage.getJMSTimestamp());
+                headers.put(JMS_REPLY_TO, getJMSReplyTo(jmsMessage));
+                headers.put(JMS_TYPE, getJMSType(jmsMessage));
+
+                // this works around a bug in the ActiveMQ property handling
+                headers.put(JMSX_GROUP_ID, 
jmsMessage.getStringProperty(JMSX_GROUP_ID));
+            } catch (JMSException e) {
+                throw new RuntimeCamelException(e);
+            }
+
+            for (final Enumeration<String> enumeration = 
jmsMessage.getPropertyNames(); enumeration.hasMoreElements();) {
+                String key = enumeration.nextElement();
+                if (hasIllegalHeaderKey(key)) {
+                    throw new IllegalHeaderException("Header " + key + " is 
not a legal JMS header name value");
+                }
+                Object value = jmsMessage.getObjectProperty(key);
+                String decodedName = keyFormatStrategy.decodeKey(key);
+                headers.put(decodedName, value);
+            }
+        }
+        if (out) {
+            exchange.getOut().setHeaders(headers);
+        } else {
+            exchange.getIn().setHeaders(headers);
+        }
+        return exchange;
+    }
+
+    /**
+     * Gets the JMSType from the message.
+     *
+     * @param message the message
+     * @return the type, can be <tt>null</tt>
+     */
+    public static String getJMSType(Message message) {
+        try {
+            return message.getJMSType();
+        } catch (Exception e) {
+            // ignore due OracleAQ does not support accessing JMSType
+        }
+
+        return null;
+    }
+
     /**
      * Sets the JMSDeliveryMode on the message.
      *
@@ -278,9 +380,9 @@ public final class JmsMessageHelper {
 
         if (deliveryMode instanceof String) {
             String s = (String) deliveryMode;
-            if ("PERSISTENT".equalsIgnoreCase(s)) {
+            if (JMS_DELIVERY_MODE_PERSISTENT.equalsIgnoreCase(s)) {
                 mode = DeliveryMode.PERSISTENT;
-            } else if ("NON_PERSISTENT".equalsIgnoreCase(s)) {
+            } else if (JMS_DELIVERY_MODE_NON_PERSISTENT.equalsIgnoreCase(s)) {
                 mode = DeliveryMode.NON_PERSISTENT;
             } else {
                 // it may be a number in the String so try that
@@ -339,6 +441,35 @@ public final class JmsMessageHelper {
     }
 
     /**
+     * Sets the JMSReplyTo on the message.
+     *
+     * @param message the message
+     * @param replyTo the reply to destination
+     */
+    public static void setJMSReplyTo(Message message, Destination replyTo) {
+        try {
+            message.setJMSReplyTo(replyTo);
+        } catch (Exception e) {
+            LOGGER.debug("Error setting the correlationId: {}", 
replyTo.toString());
+        }
+    }
+
+    /**
+     * Gets the JMSReplyTo from the message.
+     *
+     * @param message the message
+     * @return the reply to, can be <tt>null</tt>
+     */
+    public static Destination getJMSReplyTo(Message message) {
+        try {
+            return message.getJMSReplyTo();
+        } catch (Exception e) {
+            // ignore due OracleAQ does not support accessing JMSReplyTo
+        }
+        return null;
+    }
+
+    /**
      * Sets the property on the given JMS message.
      *
      * @param jmsMessage the JMS message
@@ -348,9 +479,8 @@ public final class JmsMessageHelper {
      */
     public static void setProperty(Message jmsMessage, String name, Object 
value) throws JMSException {
         if (value == null) {
-            return;
-        }
-        if (value instanceof Byte) {
+            jmsMessage.setObjectProperty(name, null);
+        } else if (value instanceof Byte) {
             jmsMessage.setByteProperty(name, (Byte) value);
         } else if (value instanceof Boolean) {
             jmsMessage.setBooleanProperty(name, (Boolean) value);
@@ -373,27 +503,31 @@ public final class JmsMessageHelper {
     }
 
     public static JmsMessageType discoverMessageTypeFromPayload(final Object 
payload) {
-        JmsMessageType answer = null;
+        JmsMessageType answer;
         // Default is a JMS Message since a body is not required
         if (payload == null) {
             answer = JmsMessageType.Message;
         } else {
             // Something was found in the body so determine
             // what type of message we need to create
-            if (Byte[].class.isInstance(payload)) {
+            if (byte[].class.isInstance(payload)) {
                 answer = JmsMessageType.Bytes;
-            } else if (Collection.class.isInstance(payload)) {
+            } else if (Map.class.isInstance(payload)) {
                 answer = JmsMessageType.Map;
-            } else if (InputStream.class.isInstance(payload)) {
+            } else if (Collection.class.isInstance(payload)) {
                 answer = JmsMessageType.Stream;
+            } else if (InputStream.class.isInstance(payload)) {
+                answer = JmsMessageType.Bytes;
             } else if (ByteBuffer.class.isInstance(payload)) {
                 answer = JmsMessageType.Bytes;
             } else if (File.class.isInstance(payload)) {
                 answer = JmsMessageType.Bytes;
             } else if (Reader.class.isInstance(payload)) {
-                answer = JmsMessageType.Bytes;
+                answer = JmsMessageType.Text;
             } else if (String.class.isInstance(payload)) {
                 answer = JmsMessageType.Text;
+            } else if (CharBuffer.class.isInstance(payload)) {
+                answer = JmsMessageType.Text;
             } else if (char[].class.isInstance(payload)) {
                 answer = JmsMessageType.Text;
             } else if (Character.class.isInstance(payload)) {
@@ -408,7 +542,7 @@ public final class JmsMessageHelper {
     }
 
     public static JmsMessageType discoverJmsMessageType(Message message) {
-        JmsMessageType answer = null;
+        JmsMessageType answer;
         if (message != null) {
             if (BytesMessage.class.isInstance(message)) {
                 answer = JmsMessageType.Bytes;
@@ -428,4 +562,9 @@ public final class JmsMessageHelper {
         }
         return answer;
     }
-}
+
+    private static boolean hasIllegalHeaderKey(String key) {
+        return key == null || key.isEmpty() || key.contains(".") || 
key.contains("-");
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/1dc631ee/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java
 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java
index 986a7bd..a1fbd7a 100644
--- 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java
+++ 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java
@@ -85,7 +85,7 @@ public class InOnlyProducer extends SjmsProducer {
     @Override
     public void sendMessage(final Exchange exchange, final AsyncCallback 
callback, final MessageProducerResources producer) throws Exception {
         try {
-            Collection<Message> messages = new ArrayList<Message>(1);
+            Collection<Message> messages = new ArrayList<>(1);
             if (exchange.getIn().getBody() != null) {
                 if (exchange.getIn().getBody() instanceof List) {
                     Iterable<?> payload = (Iterable<?>) 
exchange.getIn().getBody();
@@ -93,19 +93,15 @@ public class InOnlyProducer extends SjmsProducer {
                         Message message;
                         if (BatchMessage.class.isInstance(object)) {
                             BatchMessage<?> batchMessage = (BatchMessage<?>) 
object;
-                            message = 
JmsMessageHelper.createMessage(producer.getSession(), 
batchMessage.getPayload(), batchMessage.getHeaders(), getSjmsEndpoint()
-                                    .getJmsKeyFormatStrategy(), 
getSjmsEndpoint().getCamelContext().getTypeConverter());
+                            message = 
JmsMessageHelper.createMessage(producer.getSession(), 
batchMessage.getPayload(), batchMessage.getHeaders(), getEndpoint());
                         } else {
-                            message = 
JmsMessageHelper.createMessage(producer.getSession(), object, 
exchange.getIn().getHeaders(),
-                                    
getSjmsEndpoint().getJmsKeyFormatStrategy(), 
getSjmsEndpoint().getCamelContext().getTypeConverter());
+                            message = 
JmsMessageHelper.createMessage(producer.getSession(), object, 
exchange.getIn().getHeaders(), getEndpoint());
                         }
                         messages.add(message);
                     }
                 } else {
                     Object payload = exchange.getIn().getBody();
-                    Message message = JmsMessageHelper
-                            .createMessage(producer.getSession(), payload, 
exchange.getIn().getHeaders(), getSjmsEndpoint().getJmsKeyFormatStrategy(),
-                                    
getSjmsEndpoint().getCamelContext().getTypeConverter());
+                    Message message = 
JmsMessageHelper.createMessage(producer.getSession(), payload, 
exchange.getIn().getHeaders(), getEndpoint());
                     messages.add(message);
                 }
             }

http://git-wip-us.apache.org/repos/asf/camel/blob/1dc631ee/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
index be336b3..55f7019 100644
--- 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
+++ 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
@@ -19,12 +19,11 @@ package org.apache.camel.component.sjms.producer;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Exchanger;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 import javax.jms.Connection;
 import javax.jms.Destination;
 import javax.jms.Message;
@@ -39,8 +38,9 @@ import org.apache.camel.Exchange;
 import org.apache.camel.component.sjms.MessageConsumerResources;
 import org.apache.camel.component.sjms.MessageProducerResources;
 import org.apache.camel.component.sjms.SjmsEndpoint;
-import org.apache.camel.component.sjms.SjmsExchangeMessageHelper;
 import org.apache.camel.component.sjms.SjmsProducer;
+import org.apache.camel.component.sjms.jms.JmsConstants;
+import org.apache.camel.component.sjms.jms.JmsMessageHelper;
 import org.apache.camel.component.sjms.jms.JmsObjectFactory;
 import org.apache.camel.component.sjms.tx.SessionTransactionSynchronization;
 import org.apache.camel.util.ObjectHelper;
@@ -52,12 +52,7 @@ import org.apache.commons.pool.impl.GenericObjectPool;
  */
 public class InOutProducer extends SjmsProducer {
 
-    /**
-     * We use the {@link ReadWriteLock} to manage the {@link TreeMap} in place
-     * of a {@link ConcurrentMap} because due to significant performance gains.
-     */
-    private static Map<String, Exchanger<Object>> exchangerMap = new 
TreeMap<String, Exchanger<Object>>();
-    private ReadWriteLock lock = new ReentrantReadWriteLock();
+    private static final Map<String, Exchanger<Object>> EXCHANGERS = new 
ConcurrentHashMap<>();
 
     /**
      * A pool of {@link MessageConsumerResources} objects that are the reply
@@ -87,16 +82,14 @@ public class InOutProducer extends SjmsProducer {
                 messageConsumer.setMessageListener(new MessageListener() {
 
                     @Override
-                    public void onMessage(Message message) {
-                        if (log.isDebugEnabled()) {
-                            log.debug("Message Received in the Consumer Pool");
-                            log.debug("  Message : {}", message);
-                        }
+                    public void onMessage(final Message message) {
+                        log.debug("Message Received in the Consumer Pool");
+                        log.debug("  Message : {}", message);
                         try {
-                            Exchanger<Object> exchanger = 
exchangerMap.get(message.getJMSCorrelationID());
+                            Exchanger<Object> exchanger = 
EXCHANGERS.get(message.getJMSCorrelationID());
                             exchanger.exchange(message, getResponseTimeOut(), 
TimeUnit.MILLISECONDS);
                         } catch (Exception e) {
-                            ObjectHelper.wrapRuntimeCamelException(e);
+                            log.error("Unable to exchange message: {}", 
message, e);
                         }
 
                     }
@@ -132,9 +125,8 @@ public class InOutProducer extends SjmsProducer {
 
     private GenericObjectPool<MessageConsumerResources> consumers;
 
-    public InOutProducer(SjmsEndpoint endpoint) {
+    public InOutProducer(final SjmsEndpoint endpoint) {
         super(endpoint);
-        endpoint.getConsumerCount();
     }
 
     @Override
@@ -195,28 +187,23 @@ public class InOutProducer extends SjmsProducer {
             exchange.getUnitOfWork().addSynchronization(new 
SessionTransactionSynchronization(producer.getSession(), getCommitStrategy()));
         }
 
-        Message request = SjmsExchangeMessageHelper.createMessage(exchange, 
producer.getSession(), getSjmsEndpoint().getJmsKeyFormatStrategy(), 
getSjmsEndpoint().getCamelContext().getTypeConverter());
+        Message request = JmsMessageHelper.createMessage(exchange, 
producer.getSession(), getEndpoint());
 
         // TODO just set the correlation id don't get it from the
         // message
-        String correlationId = null;
-        if (exchange.getIn().getHeader("JMSCorrelationID", String.class) == 
null) {
+        String correlationId;
+        if (exchange.getIn().getHeader(JmsConstants.JMS_CORRELATION_ID, 
String.class) == null) {
             correlationId = UUID.randomUUID().toString().replace("-", "");
         } else {
-            correlationId = exchange.getIn().getHeader("JMSCorrelationID", 
String.class);
+            correlationId = 
exchange.getIn().getHeader(JmsConstants.JMS_CORRELATION_ID, String.class);
         }
         Object responseObject = null;
-        Exchanger<Object> messageExchanger = new Exchanger<Object>();
-        SjmsExchangeMessageHelper.setCorrelationId(request, correlationId);
-        lock.writeLock().lock();
-        try {
-            exchangerMap.put(correlationId, messageExchanger);
-        } finally {
-            lock.writeLock().unlock();
-        }
+        Exchanger<Object> messageExchanger = new Exchanger<>();
+        JmsMessageHelper.setCorrelationId(request, correlationId);
+        EXCHANGERS.put(correlationId, messageExchanger);
 
         MessageConsumerResources consumer = consumers.borrowObject();
-        SjmsExchangeMessageHelper.setJMSReplyTo(request, 
consumer.getReplyToDestination());
+        JmsMessageHelper.setJMSReplyTo(request, 
consumer.getReplyToDestination());
         consumers.returnObject(consumer);
         producer.getMessageProducer().send(request);
 
@@ -231,13 +218,7 @@ public class InOutProducer extends SjmsProducer {
 
         try {
             responseObject = messageExchanger.exchange(null, 
getResponseTimeOut(), TimeUnit.MILLISECONDS);
-
-            lock.writeLock().lock();
-            try {
-                exchangerMap.remove(correlationId);
-            } finally {
-                lock.writeLock().unlock();
-            }
+            EXCHANGERS.remove(correlationId);
         } catch (InterruptedException e) {
             log.debug("Exchanger was interrupted while waiting on response", 
e);
             exchange.setException(e);
@@ -251,7 +232,7 @@ public class InOutProducer extends SjmsProducer {
                 exchange.setException((Throwable) responseObject);
             } else if (responseObject instanceof Message) {
                 Message response = (Message) responseObject;
-                SjmsExchangeMessageHelper.populateExchange(response, exchange, 
true, getEndpoint().getJmsKeyFormatStrategy());
+                JmsMessageHelper.populateExchange(response, exchange, true, 
getEndpoint().getJmsKeyFormatStrategy());
             } else {
                 exchange.setException(new CamelException("Unknown response 
type: " + responseObject));
             }

http://git-wip-us.apache.org/repos/asf/camel/blob/1dc631ee/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/typeconversion/JMSMessageHelperTypeConversionTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/typeconversion/JMSMessageHelperTypeConversionTest.java
 
b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/typeconversion/JMSMessageHelperTypeConversionTest.java
index ab5d0ef..6a228b0 100644
--- 
a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/typeconversion/JMSMessageHelperTypeConversionTest.java
+++ 
b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/typeconversion/JMSMessageHelperTypeConversionTest.java
@@ -28,11 +28,15 @@ import java.io.InputStream;
 import java.io.Reader;
 import java.io.StringReader;
 import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.sjms.support.JmsTestSupport;
+import org.junit.Ignore;
 import org.junit.Test;
 
 public class JMSMessageHelperTypeConversionTest extends JmsTestSupport {
@@ -49,10 +53,45 @@ public class JMSMessageHelperTypeConversionTest extends 
JmsTestSupport {
         assertMockEndpointsSatisfied();
         assertTrue(String.class.isInstance(message.getIn().getBody()));
     }
-    
+
+    @Test
+    public void testJMSMessageHelperMap() throws Exception {
+        getMockEndpoint(MOCK_RESULT_URI).expectedMessageCount(1);
+
+        Map<Object,Object> map = new HashMap<>();
+        map.put("Hello", "Camel");
+        map.put("Int", Integer.MAX_VALUE);
+        map.put("Boolean", Boolean.TRUE);
+        map.put(Boolean.TRUE, Long.MAX_VALUE);
+
+        template.sendBody(SJMS_QUEUE_URI, map);
+        assertMockEndpointsSatisfied();
+        assertTrue(Map.class.isInstance(message.getIn().getBody()));
+        assertEquals("Camel", message.getIn().getBody(Map.class).get("Hello"));
+        assertEquals(Integer.MAX_VALUE, 
message.getIn().getBody(Map.class).get("Int"));
+        assertEquals(Boolean.TRUE, 
message.getIn().getBody(Map.class).get("Boolean"));
+        assertEquals(Long.MAX_VALUE, 
message.getIn().getBody(Map.class).get("true"));
+    }
+
+    @Ignore
+    @Test
+    public void testJMSMessageHelperCollection() throws Exception {
+        // TODO: Once SJMS can accept a Collection as Body
+    }
+
+    @Test
+    public void testJMSMessageHelperByteArray() throws Exception {
+        getMockEndpoint(MOCK_RESULT_URI).expectedBodiesReceived("Hello 
Camel".getBytes());
+
+        byte[] bytes = "Hello Camel".getBytes();
+        template.sendBody(SJMS_QUEUE_URI, bytes);
+        assertMockEndpointsSatisfied();
+        assertTrue(byte[].class.isInstance(message.getIn().getBody()));
+    }
+
     @Test
     public void testJMSMessageHelperInputStream() throws Exception {
-        getMockEndpoint(MOCK_RESULT_URI).expectedBodiesReceived("Hello Camel");
+        getMockEndpoint(MOCK_RESULT_URI).expectedBodiesReceived("Hello 
Camel".getBytes());
         String p = "Hello Camel";
         InputStream is = new ByteArrayInputStream(p.getBytes());
         template.sendBody(SJMS_QUEUE_URI, is);
@@ -61,10 +100,19 @@ public class JMSMessageHelperTypeConversionTest extends 
JmsTestSupport {
     }
     
     @Test
-    public void testJMSMessageHelperByteBuffer() throws Exception {
+    public void testJMSMessageHelperCharBuffer() throws Exception {
         getMockEndpoint(MOCK_RESULT_URI).expectedBodiesReceived("Hello Camel");
+        CharBuffer cb = CharBuffer.wrap("Hello Camel");
+        template.sendBody(SJMS_QUEUE_URI, cb);
+        assertMockEndpointsSatisfied();
+        assertTrue(String.class.isInstance(message.getIn().getBody()));
+    }
+
+    @Test
+    public void testJMSMessageHelperByteBuffer() throws Exception {
+        getMockEndpoint(MOCK_RESULT_URI).expectedBodiesReceived("Hello 
Camel".getBytes());
         String p = "Hello Camel";
-        ByteBuffer bb = ByteBuffer.wrap(p.getBytes()); 
+        ByteBuffer bb = ByteBuffer.wrap(p.getBytes());
         template.sendBody(SJMS_QUEUE_URI, bb);
         assertMockEndpointsSatisfied();
         assertTrue(byte[].class.isInstance(message.getIn().getBody()));
@@ -72,7 +120,7 @@ public class JMSMessageHelperTypeConversionTest extends 
JmsTestSupport {
     
     @Test
     public void testJMSMessageHelperFile() throws InterruptedException, 
IOException {
-        getMockEndpoint(MOCK_RESULT_URI).expectedBodiesReceived("Hello Camel");
+        getMockEndpoint(MOCK_RESULT_URI).expectedBodiesReceived("Hello 
Camel".getBytes());
         String p = "Hello Camel";
         File f = File.createTempFile("tmp-test", ".txt");
         BufferedWriter bw = new BufferedWriter(new FileWriter(f));
@@ -96,9 +144,8 @@ public class JMSMessageHelperTypeConversionTest extends 
JmsTestSupport {
         Reader test = new BufferedReader(new FileReader(f.getAbsolutePath()));
         template.sendBody(SJMS_QUEUE_URI, test);
         assertMockEndpointsSatisfied();
-        boolean resultDelete = f.delete();
-        assertTrue(resultDelete);
-        assertTrue(byte[].class.isInstance(message.getIn().getBody()));
+        assertTrue(f.delete());
+        assertTrue(String.class.isInstance(message.getIn().getBody()));
     }
     
     @Test
@@ -108,7 +155,7 @@ public class JMSMessageHelperTypeConversionTest extends 
JmsTestSupport {
         StringReader test = new StringReader(p);
         template.sendBody(SJMS_QUEUE_URI, test);
         assertMockEndpointsSatisfied();
-        assertTrue(byte[].class.isInstance(message.getIn().getBody()));
+        assertTrue(String.class.isInstance(message.getIn().getBody()));
     }
     
     @Test
@@ -143,7 +190,7 @@ public class JMSMessageHelperTypeConversionTest extends 
JmsTestSupport {
             public void configure() throws Exception {
                 interceptSendToEndpoint(MOCK_RESULT_URI).process(new 
Processor() {
                     public void process(Exchange exchange) throws Exception {
-                        message = (Exchange) exchange;
+                        message = exchange;
                     }
                 });
                 

Reply via email to