CAMEL-7949: Harden sjsm component. Thanks to Aaron Whiteside for the patch.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1dc631ee Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1dc631ee Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1dc631ee Branch: refs/heads/master Commit: 1dc631ee9860da773371c16d2fa6982ae92b13d7 Parents: 3e1951a Author: Claus Ibsen <davscl...@apache.org> Authored: Sat Nov 29 10:31:16 2014 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sat Nov 29 10:31:16 2014 +0100 ---------------------------------------------------------------------- .../camel/component/sjms/SjmsConstants.java | 13 +- .../camel/component/sjms/SjmsEndpoint.java | 2 +- .../sjms/SjmsExchangeMessageHelper.java | 467 ------------------- .../sjms/consumer/AbstractMessageHandler.java | 12 +- .../sjms/consumer/InOnlyMessageHandler.java | 8 +- .../sjms/consumer/InOutMessageHandler.java | 27 +- .../camel/component/sjms/jms/JmsConstants.java | 62 ++- .../sjms/jms/JmsMessageHeaderType.java | 36 -- .../component/sjms/jms/JmsMessageHelper.java | 321 +++++++++---- .../component/sjms/producer/InOnlyProducer.java | 12 +- .../component/sjms/producer/InOutProducer.java | 59 +-- .../JMSMessageHelperTypeConversionTest.java | 67 ++- 12 files changed, 384 insertions(+), 702 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/1dc631ee/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConstants.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConstants.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConstants.java index 035c93a..b39e51e 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConstants.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConstants.java @@ -16,17 +16,8 @@ */ package org.apache.camel.component.sjms; -public final class SjmsConstants { - public static final String QUEUE_PREFIX = "queue:"; - public static final String TOPIC_PREFIX = "topic:"; - public static final String TEMP_QUEUE_PREFIX = "temp:queue:"; - public static final String TEMP_TOPIC_PREFIX = "temp:topic:"; +public interface SjmsConstants { public static final String JMS_MESSAGE_TYPE = "JmsMessageType"; - public static final String ORIGINAL_MESSAGE = "SjmsOriginalMessage"; - private SjmsConstants() { - // Helper class - } - -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/1dc631ee/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java index 6284c13..6841d77 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java @@ -121,7 +121,7 @@ public class SjmsEndpoint extends DefaultEndpoint implements MultipleConsumersSu @Override public Producer createProducer() throws Exception { - SjmsProducer producer = null; + SjmsProducer producer; if (getExchangePattern().equals(ExchangePattern.InOnly)) { producer = new InOnlyProducer(this); } else { http://git-wip-us.apache.org/repos/asf/camel/blob/1dc631ee/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsExchangeMessageHelper.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsExchangeMessageHelper.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsExchangeMessageHelper.java index 434a3c2..e69de29 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsExchangeMessageHelper.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsExchangeMessageHelper.java @@ -1,467 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.sjms; - -import java.io.ByteArrayOutputStream; -import java.util.Enumeration; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.Map.Entry; -import javax.jms.BytesMessage; -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.MapMessage; -import javax.jms.Message; -import javax.jms.ObjectMessage; -import javax.jms.Session; -import javax.jms.StreamMessage; -import javax.jms.TextMessage; - -import org.apache.camel.Endpoint; -import org.apache.camel.Exchange; -import org.apache.camel.RuntimeCamelException; -import org.apache.camel.TypeConverter; -import org.apache.camel.component.sjms.jms.DefaultJmsKeyFormatStrategy; -import org.apache.camel.component.sjms.jms.IllegalHeaderException; -import org.apache.camel.component.sjms.jms.JmsConstants; -import org.apache.camel.component.sjms.jms.JmsMessageHeaderType; -import org.apache.camel.component.sjms.jms.JmsMessageHelper; -import org.apache.camel.component.sjms.jms.JmsMessageType; -import org.apache.camel.component.sjms.jms.KeyFormatStrategy; -import org.apache.camel.impl.DefaultMessage; -import org.apache.camel.util.ExchangeHelper; -import org.apache.camel.util.ObjectHelper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static org.apache.camel.component.sjms.SjmsConstants.JMS_MESSAGE_TYPE; -import static org.apache.camel.component.sjms.SjmsConstants.QUEUE_PREFIX; -import static org.apache.camel.component.sjms.SjmsConstants.TOPIC_PREFIX; -import static org.apache.camel.util.ObjectHelper.removeStartingCharacters; - -public final class SjmsExchangeMessageHelper { - private static final Logger LOGGER = LoggerFactory.getLogger(SjmsExchangeMessageHelper.class); - - private SjmsExchangeMessageHelper() { - } - - public static Exchange createExchange(Message message, Endpoint endpoint) { - Exchange exchange = endpoint.createExchange(); - return populateExchange(message, exchange, false, ((SjmsEndpoint)endpoint).getJmsKeyFormatStrategy()); - } - - @Deprecated - /** - * Please use the one which has the parameter of keyFormatStrategy - */ - public static Exchange populateExchange(Message message, Exchange exchange, boolean out) { - return populateExchange(message, exchange, out, new DefaultJmsKeyFormatStrategy()); - } - - @SuppressWarnings("unchecked") - public static Exchange populateExchange(Message message, Exchange exchange, boolean out, KeyFormatStrategy keyFormatStrategy) { - try { - SjmsExchangeMessageHelper.setJmsMessageHeaders(message, exchange, out, keyFormatStrategy); - if (message != null) { - // convert to JMS Message of the given type - - DefaultMessage bodyMessage = null; - if (out) { - bodyMessage = (DefaultMessage) exchange.getOut(); - } else { - bodyMessage = (DefaultMessage) exchange.getIn(); - } - switch (JmsMessageHelper.discoverJmsMessageType(message)) { - case Bytes: - BytesMessage bytesMessage = (BytesMessage) message; - if (bytesMessage.getBodyLength() > Integer.MAX_VALUE) { - LOGGER.warn("Length of BytesMessage is too long: {}", bytesMessage.getBodyLength()); - return null; - } - byte[] result = new byte[(int) bytesMessage.getBodyLength()]; - bytesMessage.readBytes(result); - bodyMessage.setHeader(JMS_MESSAGE_TYPE, JmsMessageType.Bytes); - bodyMessage.setBody(result); - break; - case Map: - Map<String, Object> body = new HashMap<String, Object>(); - MapMessage mapMessage = (MapMessage) message; - Enumeration<String> names = mapMessage.getMapNames(); - while (names.hasMoreElements()) { - String key = names.nextElement(); - Object value = mapMessage.getObject(key); - body.put(key, value); - } - bodyMessage.setHeader(JMS_MESSAGE_TYPE, JmsMessageType.Map); - bodyMessage.setBody(body); - break; - case Object: - ObjectMessage objMsg = (ObjectMessage) message; - bodyMessage.setHeader(JMS_MESSAGE_TYPE, JmsMessageType.Object); - bodyMessage.setBody(objMsg.getObject()); - break; - case Text: - TextMessage textMsg = (TextMessage) message; - bodyMessage.setHeader(JMS_MESSAGE_TYPE, JmsMessageType.Text); - bodyMessage.setBody(textMsg.getText()); - break; - case Stream: - StreamMessage streamMessage = (StreamMessage) message; - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - int next = streamMessage.readByte(); - while (next > -1) { - baos.write(next); - next = streamMessage.readByte(); - } - baos.flush(); - bodyMessage.setHeader(JMS_MESSAGE_TYPE, JmsMessageType.Bytes); - bodyMessage.setBody(baos.toByteArray()); - break; - case Message: - default: - // Do nothing. Only set the headers for an empty message - bodyMessage.setBody(message); - break; - } - } - } catch (Exception e) { - exchange.setException(e); - } - return exchange; - } - - /** - * Removes the property from the JMS message. - * - * @param jmsMessage the JMS message - * @param name name of the property to remove - * @return the old value of the property or <tt>null</tt> if not exists - * @throws JMSException can be thrown - */ - public static Object removeJmsProperty(Message jmsMessage, String name) throws JMSException { - // check if the property exists - if (!jmsMessage.propertyExists(name)) { - return null; - } - - Object answer = null; - - // store the properties we want to keep in a temporary map - // as the JMS API is a bit strict as we are not allowed to - // clear a single property, but must clear them all and redo - // the properties - Map<String, Object> map = new LinkedHashMap<String, Object>(); - Enumeration<?> en = jmsMessage.getPropertyNames(); - while (en.hasMoreElements()) { - String key = (String) en.nextElement(); - if (name.equals(key)) { - answer = key; - } else { - map.put(key, jmsMessage.getObjectProperty(key)); - } - } - - // redo the properties to keep - jmsMessage.clearProperties(); - for (Entry<String, Object> entry : map.entrySet()) { - jmsMessage.setObjectProperty(entry.getKey(), entry.getValue()); - } - - return answer; - } - - /** - * Tests whether a given property with the name exists - * - * @param jmsMessage the JMS message - * @param name name of the property to test if exists - * @return <tt>true</tt> if the property exists, <tt>false</tt> if not. - * @throws JMSException can be thrown - */ - public static boolean hasProperty(Message jmsMessage, String name) throws JMSException { - Enumeration<?> en = jmsMessage.getPropertyNames(); - while (en.hasMoreElements()) { - String key = (String) en.nextElement(); - if (name.equals(key)) { - return true; - } - } - return false; - } - - /** - * Sets the property on the given JMS message. - * - * @param jmsMessage the JMS message - * @param name name of the property to set - * @param value the value - * @throws JMSException can be thrown - */ - public static void setProperty(Message jmsMessage, String name, Object value) throws JMSException { - if (value == null) { - return; - } - if (value instanceof Byte) { - jmsMessage.setByteProperty(name, (Byte) value); - } else if (value instanceof Boolean) { - jmsMessage.setBooleanProperty(name, (Boolean) value); - } else if (value instanceof Double) { - jmsMessage.setDoubleProperty(name, (Double) value); - } else if (value instanceof Float) { - jmsMessage.setFloatProperty(name, (Float) value); - } else if (value instanceof Integer) { - jmsMessage.setIntProperty(name, (Integer) value); - } else if (value instanceof Long) { - jmsMessage.setLongProperty(name, (Long) value); - } else if (value instanceof Short) { - jmsMessage.setShortProperty(name, (Short) value); - } else if (value instanceof String) { - jmsMessage.setStringProperty(name, (String) value); - } else { - // fallback to Object - jmsMessage.setObjectProperty(name, value); - } - } - - /** - * Sets the correlation id on the JMS message. - * <p/> - * Will ignore exception thrown - * - * @param message the JMS message - * @param correlationId the correlation id - */ - public static void setCorrelationId(Message message, String correlationId) { - try { - message.setJMSCorrelationID(correlationId); - } catch (JMSException e) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Error setting the correlationId: {}", correlationId); - } - } - } - - /** - * Normalizes the destination name, by removing any leading queue or topic - * prefixes. - * - * @param destination the destination - * @return the normalized destination - */ - public static String normalizeDestinationName(String destination) { - if (ObjectHelper.isEmpty(destination)) { - return destination; - } - if (destination.startsWith(QUEUE_PREFIX)) { - return removeStartingCharacters(destination.substring(QUEUE_PREFIX.length()), '/'); - } else if (destination.startsWith(TOPIC_PREFIX)) { - return removeStartingCharacters(destination.substring(TOPIC_PREFIX.length()), '/'); - } else { - return destination; - } - } - - /** - * Sets the JMSReplyTo on the message. - * - * @param message the message - * @param replyTo the reply to destination - */ - public static void setJMSReplyTo(Message message, Destination replyTo) { - try { - message.setJMSReplyTo(replyTo); - } catch (Exception e) { - LOGGER.debug("Error setting the correlationId: {}", replyTo.toString()); - } - } - - /** - * Gets the JMSReplyTo from the message. - * - * @param message the message - * @return the reply to, can be <tt>null</tt> - */ - public static Destination getJMSReplyTo(Message message) { - try { - return message.getJMSReplyTo(); - } catch (Exception e) { - // ignore due OracleAQ does not support accessing JMSReplyTo - } - - return null; - } - - /** - * Gets the JMSType from the message. - * - * @param message the message - * @return the type, can be <tt>null</tt> - */ - public static String getJMSType(Message message) { - try { - return message.getJMSType(); - } catch (Exception e) { - // ignore due OracleAQ does not support accessing JMSType - } - - return null; - } - - /** - * Gets the JMSRedelivered from the message. - * - * @param message the message - * @return <tt>true</tt> if redelivered, <tt>false</tt> if not, - * <tt>null</tt> if not able to determine - */ - public static Boolean getJMSRedelivered(Message message) { - try { - return message.getJMSRedelivered(); - } catch (Exception e) { - // ignore if JMS broker do not support this - } - - return null; - } - - /** - * Sets the JMSDeliveryMode on the message. - * - * @param exchange the exchange - * @param message the message - * @param deliveryMode the delivery mode, either as a String or integer - * @throws javax.jms.JMSException is thrown if error setting the delivery - * mode - */ - public static void setJMSDeliveryMode(Exchange exchange, Message message, Object deliveryMode) throws JMSException { - Integer mode = null; - - if (deliveryMode instanceof String) { - String s = (String) deliveryMode; - if ("PERSISTENT".equalsIgnoreCase(s)) { - mode = DeliveryMode.PERSISTENT; - } else if ("NON_PERSISTENT".equalsIgnoreCase(s)) { - mode = DeliveryMode.NON_PERSISTENT; - } else { - // it may be a number in the String so try that - Integer value = ExchangeHelper.convertToType(exchange, Integer.class, deliveryMode); - if (value != null) { - mode = value; - } else { - throw new IllegalArgumentException("Unknown delivery mode with value: " + deliveryMode); - } - } - } else { - // fallback and try to convert to a number - Integer value = ExchangeHelper.convertToType(exchange, Integer.class, deliveryMode); - if (value != null) { - mode = value; - } - } - - if (mode != null) { - message.setJMSDeliveryMode(mode); - message.setIntProperty(JmsConstants.JMS_DELIVERY_MODE, mode); - } - } - - @Deprecated - /** - * Please use the one which has the parameter of keyFormatStrategy - */ - public static Exchange setJmsMessageHeaders(final Message jmsMessage, final Exchange exchange, boolean out) throws JMSException { - return setJmsMessageHeaders(jmsMessage, exchange, out, new DefaultJmsKeyFormatStrategy()); - } - - @SuppressWarnings("unchecked") - public static Exchange setJmsMessageHeaders(final Message jmsMessage, final Exchange exchange, boolean out, KeyFormatStrategy keyFormatStrategy) throws JMSException { - Map<String, Object> headers = new HashMap<String, Object>(); - if (jmsMessage != null) { - // lets populate the standard JMS message headers - try { - headers.put(JmsMessageHeaderType.JMSCorrelationID.toString(), jmsMessage.getJMSCorrelationID()); - headers.put(JmsMessageHeaderType.JMSDeliveryMode.toString(), jmsMessage.getJMSDeliveryMode()); - headers.put(JmsMessageHeaderType.JMSDestination.toString(), jmsMessage.getJMSDestination()); - headers.put(JmsMessageHeaderType.JMSExpiration.toString(), jmsMessage.getJMSExpiration()); - headers.put(JmsMessageHeaderType.JMSMessageID.toString(), jmsMessage.getJMSMessageID()); - headers.put(JmsMessageHeaderType.JMSPriority.toString(), jmsMessage.getJMSPriority()); - headers.put(JmsMessageHeaderType.JMSRedelivered.toString(), jmsMessage.getJMSRedelivered()); - headers.put(JmsMessageHeaderType.JMSTimestamp.toString(), jmsMessage.getJMSTimestamp()); - headers.put(JmsMessageHeaderType.JMSReplyTo.toString(), SjmsExchangeMessageHelper.getJMSReplyTo(jmsMessage)); - headers.put(JmsMessageHeaderType.JMSType.toString(), SjmsExchangeMessageHelper.getJMSType(jmsMessage)); - - // this works around a bug in the ActiveMQ property handling - headers.put(JmsMessageHeaderType.JMSXGroupID.toString(), jmsMessage.getStringProperty(JmsMessageHeaderType.JMSXGroupID.toString())); - } catch (JMSException e) { - throw new RuntimeCamelException(e); - } - - for (Enumeration<String> enumeration = jmsMessage.getPropertyNames(); enumeration.hasMoreElements();) { - String key = enumeration.nextElement(); - if (hasIllegalHeaderKey(key)) { - throw new IllegalHeaderException("Header " + key + " is not a legal JMS header name value"); - } - Object value = jmsMessage.getObjectProperty(key); - String decodedName = keyFormatStrategy.decodeKey(key); - headers.put(decodedName, value); - } - } - if (out) { - exchange.getOut().setHeaders(headers); - } else { - exchange.getIn().setHeaders(headers); - } - return exchange; - } - - public static Message createMessage(Exchange exchange, Session session, KeyFormatStrategy keyFormatStrategy, TypeConverter typeConverter) throws Exception { - Message answer = null; - Object body = null; - Map<String, Object> bodyHeaders = null; - - - if (exchange.getOut().getBody() != null) { - body = exchange.getOut().getBody(); - bodyHeaders = new HashMap<String, Object>(exchange.getOut().getHeaders()); - } else { - body = exchange.getIn().getBody(); - bodyHeaders = new HashMap<String, Object>(exchange.getIn().getHeaders()); - } - - answer = JmsMessageHelper.createMessage(session, body, bodyHeaders, keyFormatStrategy, typeConverter); - - return answer; - } - - private static boolean hasIllegalHeaderKey(String key) { - if (key == null) { - return true; - } - if (key.equals("")) { - return true; - } - if (key.indexOf(".") > -1) { - return true; - } - if (key.indexOf("-") > -1) { - return true; - } - return false; - } -} http://git-wip-us.apache.org/repos/asf/camel/blob/1dc631ee/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/AbstractMessageHandler.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/AbstractMessageHandler.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/AbstractMessageHandler.java index 763b2a7..c3c5e55 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/AbstractMessageHandler.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/AbstractMessageHandler.java @@ -25,8 +25,7 @@ import org.apache.camel.AsyncProcessor; import org.apache.camel.Exchange; import org.apache.camel.RuntimeCamelException; import org.apache.camel.component.sjms.SjmsEndpoint; -import org.apache.camel.component.sjms.SjmsExchangeMessageHelper; -import org.apache.camel.component.sjms.TransactionCommitStrategy; +import org.apache.camel.component.sjms.jms.JmsMessageHelper; import org.apache.camel.impl.DefaultExchange; import org.apache.camel.spi.Synchronization; import org.slf4j.Logger; @@ -50,7 +49,6 @@ public abstract class AbstractMessageHandler implements MessageListener { private boolean synchronous = true; private Synchronization synchronization; private boolean topic; - private TransactionCommitStrategy commitStrategy; public AbstractMessageHandler(SjmsEndpoint endpoint, ExecutorService executor) { this.endpoint = endpoint; @@ -72,7 +70,7 @@ public abstract class AbstractMessageHandler implements MessageListener { public void onMessage(Message message) { RuntimeCamelException rce = null; try { - final DefaultExchange exchange = (DefaultExchange) SjmsExchangeMessageHelper.createExchange(message, getEndpoint()); + final DefaultExchange exchange = (DefaultExchange) JmsMessageHelper.createExchange(message, getEndpoint()); log.debug("Processing Exchange.id:{}", exchange.getExchangeId()); @@ -92,9 +90,7 @@ public abstract class AbstractMessageHandler implements MessageListener { handleMessage(exchange); } catch (Exception e) { exchange.setException(e); -// ObjectHelper.wrapRuntimeCamelException(e); } - } }); } @@ -169,8 +165,4 @@ public abstract class AbstractMessageHandler implements MessageListener { public boolean isTopic() { return topic; } - - public TransactionCommitStrategy getCommitStrategy() { - return commitStrategy; - } } http://git-wip-us.apache.org/repos/asf/camel/blob/1dc631ee/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOnlyMessageHandler.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOnlyMessageHandler.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOnlyMessageHandler.java index da658b7..ae27d27 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOnlyMessageHandler.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOnlyMessageHandler.java @@ -46,16 +46,14 @@ public class InOnlyMessageHandler extends AbstractMessageHandler { } /** - * @param message + * @param exchange */ @Override public void handleMessage(final Exchange exchange) { if (log.isDebugEnabled()) { log.debug("Handling InOnly Message: {}", exchange.getIn().getBody()); } - if (exchange.isFailed()) { - return; - } else { + if (!exchange.isFailed()) { NoOpAsyncCallback callback = new NoOpAsyncCallback(); if (isTransacted() || isSynchronous()) { // must process synchronous if transacted or configured to @@ -72,7 +70,7 @@ public class InOnlyMessageHandler extends AbstractMessageHandler { } } else { // process asynchronous using the async routing engine - log.debug("Aynchronous processing: Message[{}], Destination[{}] ", exchange.getIn().getBody(), getEndpoint().getEndpointUri()); + log.debug("Asynchronous processing: Message[{}], Destination[{}] ", exchange.getIn().getBody(), getEndpoint().getEndpointUri()); getProcessor().process(exchange, callback); } http://git-wip-us.apache.org/repos/asf/camel/blob/1dc631ee/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOutMessageHandler.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOutMessageHandler.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOutMessageHandler.java index 9be8d2f..25a0f1d 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOutMessageHandler.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOutMessageHandler.java @@ -31,10 +31,9 @@ import javax.jms.Topic; import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.component.sjms.SjmsEndpoint; -import org.apache.camel.component.sjms.SjmsExchangeMessageHelper; +import org.apache.camel.component.sjms.jms.JmsConstants; import org.apache.camel.component.sjms.jms.JmsMessageHelper; import org.apache.camel.spi.Synchronization; -import org.apache.camel.util.ObjectHelper; /** * cache manager to store and purge unused cashed producers or we will have a @@ -88,7 +87,7 @@ public class InOutMessageHandler extends AbstractMessageHandler { } } - MessageHanderAsyncCallback callback = new MessageHanderAsyncCallback(exchange, messageProducer); + MessageHandlerAsyncCallback callback = new MessageHandlerAsyncCallback(exchange, messageProducer); if (exchange.isFailed()) { return; } else { @@ -105,7 +104,7 @@ public class InOutMessageHandler extends AbstractMessageHandler { } } else { // process asynchronous using the async routing engine - log.debug("Aynchronous processing: Message[{}], Destination[{}] ", exchange.getIn().getBody(), getEndpoint().getEndpointUri()); + log.debug("Asynchronous processing: Message[{}], Destination[{}] ", exchange.getIn().getBody(), getEndpoint().getEndpointUri()); getProcessor().process(exchange, callback); } } @@ -120,12 +119,11 @@ public class InOutMessageHandler extends AbstractMessageHandler { @Override public void close() { - for (String key : producerCache.keySet()) { - MessageProducer mp = producerCache.get(key); + for (final Map.Entry<String,MessageProducer> entry : producerCache.entrySet()) { try { - mp.close(); + entry.getValue().close(); } catch (JMSException e) { - ObjectHelper.wrapRuntimeCamelException(e); + log.debug("Cached MessageProducer with key:{} threw an unexpected exception", entry.getKey(), e); } } producerCache.clear(); @@ -146,12 +144,12 @@ public class InOutMessageHandler extends AbstractMessageHandler { return answer; } - protected class MessageHanderAsyncCallback implements AsyncCallback { + protected class MessageHandlerAsyncCallback implements AsyncCallback { - private Exchange exchange; - private MessageProducer localProducer; + private final Exchange exchange; + private final MessageProducer localProducer; - public MessageHanderAsyncCallback(Exchange exchange, MessageProducer localProducer) { + public MessageHandlerAsyncCallback(Exchange exchange, MessageProducer localProducer) { this.exchange = exchange; this.localProducer = localProducer; } @@ -159,9 +157,8 @@ public class InOutMessageHandler extends AbstractMessageHandler { @Override public void done(boolean sync) { try { - Message response = SjmsExchangeMessageHelper.createMessage(exchange, getSession(), - getEndpoint().getJmsKeyFormatStrategy(), getEndpoint().getCamelContext().getTypeConverter()); - response.setJMSCorrelationID(exchange.getIn().getHeader("JMSCorrelationID", String.class)); + Message response = JmsMessageHelper.createMessage(exchange, getSession(), getEndpoint()); + response.setJMSCorrelationID(exchange.getIn().getHeader(JmsConstants.JMS_CORRELATION_ID, String.class)); localProducer.send(response); } catch (Exception e) { exchange.setException(e); http://git-wip-us.apache.org/repos/asf/camel/blob/1dc631ee/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsConstants.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsConstants.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsConstants.java index d92d2c8..725cc20 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsConstants.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsConstants.java @@ -19,15 +19,59 @@ package org.apache.camel.component.sjms.jms; /** * JMS constants */ -public final class JmsConstants { +public interface JmsConstants { - public static final String JMS_DESTINATION = "CamelJmsDestination"; - public static final String JMS_DESTINATION_NAME = "CamelJmsDestinationName"; - public static final String JMS_MESSAGE_TYPE = "CamelJmsMessageType"; - public static final String JMS_DELIVERY_MODE = "CamelJmsDeliveryMode"; + /** + * Set by the publishing Client + */ + public static final String JMS_CORRELATION_ID = "JMSCorrelationID"; + /** + * Set on the send or publish event + */ + public static final String JMS_DELIVERY_MODE = "JMSDeliveryMode"; + /** + * Set on the send or publish event + */ + public static final String JMS_DESTINATION = "JMSDestination"; + /** + * Set on the send or publish event + */ + public static final String JMS_EXPIRATION = "JMSExpiration"; + /** + * Set on the send or publish event + */ + public static final String JMS_MESSAGE_ID = "JMSMessageID"; + /** + * Set on the send or publish event + */ + public static final String JMS_PRIORITY = "JMSPriority"; + /** + * A redelivery flag set by the JMS provider + */ + public static final String JMS_REDELIVERED = "JMSRedelivered"; + /** + * The JMS Reply To {@link javax.jms.Destination} set by the publishing Client + */ + public static final String JMS_REPLY_TO = "JMSReplyTo"; + /** + * Set on the send or publish event + */ + public static final String JMS_TIMESTAMP = "JMSTimestamp"; + /** + * Set by the publishing Client + */ + public static final String JMS_TYPE = "JMSType"; - private JmsConstants() { - // utility class - } + /** + * Custom headers + */ + public static final String JMSX_GROUP_ID = "JMSXGroupID"; -} + + /** + * String representation of JMS delivery modes. + */ + public static final String JMS_DELIVERY_MODE_PERSISTENT = "PERSISTENT"; + public static final String JMS_DELIVERY_MODE_NON_PERSISTENT = "NON_PERSISTENT"; + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/1dc631ee/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHeaderType.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHeaderType.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHeaderType.java index 101423b..e69de29 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHeaderType.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHeaderType.java @@ -1,36 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.sjms.jms; - -public enum JmsMessageHeaderType { - JMSDestination, - JMSDeliveryMode, - JMSExpiration, - JMSPriority, - JMSMessageID, - JMSTimestamp, - JMSCorrelationID, - JMSReplyTo, - JMSType, - JMSRedelivered, - - /* - * Add known custom headers - */ - JMSXGroupID - -} http://git-wip-us.apache.org/repos/asf/camel/blob/1dc631ee/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java index 79c57be..5ca09b4 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java @@ -16,15 +16,17 @@ */ package org.apache.camel.component.sjms.jms; -import java.io.ByteArrayOutputStream; import java.io.File; -import java.io.FileInputStream; import java.io.InputStream; import java.io.Reader; import java.io.Serializable; import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.util.ArrayList; import java.util.Collection; +import java.util.Enumeration; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Set; import javax.jms.BytesMessage; @@ -38,63 +40,123 @@ import javax.jms.Session; import javax.jms.StreamMessage; import javax.jms.TextMessage; +import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; +import org.apache.camel.RuntimeCamelException; import org.apache.camel.TypeConverter; +import org.apache.camel.component.sjms.SjmsConstants; +import org.apache.camel.component.sjms.SjmsEndpoint; +import org.apache.camel.impl.DefaultMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Utility class for {@link javax.jms.Message}. */ -public final class JmsMessageHelper { - - /** - * Set by the publishing Client - */ - public static final String JMS_CORRELATION_ID = "JMSCorrelationID"; - /** - * Set on the send or publish event - */ - public static final String JMS_DELIVERY_MODE = "JMSDeliveryMode"; - /** - * Set on the send or publish event - */ - public static final String JMS_DESTINATION = "JMSDestination"; - /** - * Set on the send or publish event - */ - public static final String JMS_EXPIRATION = "JMSExpiration"; - /** - * Set on the send or publish event - */ - public static final String JMS_MESSAGE_ID = "JMSMessageID"; - /** - * Set on the send or publish event - */ - public static final String JMS_PRIORITY = "JMSPriority"; - /** - * A redelivery flag set by the JMS provider - */ - public static final String JMS_REDELIVERED = "JMSRedelivered"; - /** - * The JMS Reply To {@link Destination} set by the publishing Client - */ - public static final String JMS_REPLY_TO = "JMSReplyTo"; - /** - * Set on the send or publish event - */ - public static final String JMS_TIMESTAMP = "JMSTimestamp"; - /** - * Set by the publishing Client - */ - public static final String JMS_TYPE = "JMSType"; +public final class JmsMessageHelper implements JmsConstants { private static final Logger LOGGER = LoggerFactory.getLogger(JmsMessageHelper.class); private JmsMessageHelper() { } - //@SuppressWarnings("unchecked") - public static Message createMessage(Session session, Object payload, Map<String, Object> messageHeaders, KeyFormatStrategy keyFormatStrategy, TypeConverter typeConverter) throws Exception { + public static Exchange createExchange(Message message, Endpoint endpoint) { + Exchange exchange = endpoint.createExchange(); + return populateExchange(message, exchange, false, ((SjmsEndpoint)endpoint).getJmsKeyFormatStrategy()); + } + + @SuppressWarnings("unchecked") + public static Exchange populateExchange(Message message, Exchange exchange, boolean out, KeyFormatStrategy keyFormatStrategy) { + try { + setJmsMessageHeaders(message, exchange, out, keyFormatStrategy); + if (message != null) { + // convert to JMS Message of the given type + + DefaultMessage bodyMessage; + if (out) { + bodyMessage = (DefaultMessage) exchange.getOut(); + } else { + bodyMessage = (DefaultMessage) exchange.getIn(); + } + switch (JmsMessageHelper.discoverJmsMessageType(message)) { + case Bytes: + BytesMessage bytesMessage = (BytesMessage) message; + if (bytesMessage.getBodyLength() > Integer.MAX_VALUE) { + LOGGER.warn("Length of BytesMessage is too long: {}", bytesMessage.getBodyLength()); + return null; + } + byte[] result = new byte[(int) bytesMessage.getBodyLength()]; + bytesMessage.readBytes(result); + bodyMessage.setHeader(SjmsConstants.JMS_MESSAGE_TYPE, JmsMessageType.Bytes); + bodyMessage.setBody(result); + break; + case Map: + Map<String, Object> body = new HashMap<>(); + MapMessage mapMessage = (MapMessage) message; + Enumeration<String> names = mapMessage.getMapNames(); + while (names.hasMoreElements()) { + String key = names.nextElement(); + Object value = mapMessage.getObject(key); + body.put(key, value); + } + bodyMessage.setHeader(SjmsConstants.JMS_MESSAGE_TYPE, JmsMessageType.Map); + bodyMessage.setBody(body); + break; + case Object: + ObjectMessage objMsg = (ObjectMessage) message; + bodyMessage.setHeader(SjmsConstants.JMS_MESSAGE_TYPE, JmsMessageType.Object); + bodyMessage.setBody(objMsg.getObject()); + break; + case Text: + TextMessage textMsg = (TextMessage) message; + bodyMessage.setHeader(SjmsConstants.JMS_MESSAGE_TYPE, JmsMessageType.Text); + bodyMessage.setBody(textMsg.getText()); + break; + case Stream: + StreamMessage streamMessage = (StreamMessage) message; + List list = new ArrayList<>(); + Object obj; + while ((obj = streamMessage.readObject()) != null) { + list.add(obj); + } + bodyMessage.setHeader(SjmsConstants.JMS_MESSAGE_TYPE, JmsMessageType.Stream); + bodyMessage.setBody(list); + break; + case Message: + default: + // Do nothing. Only set the headers for an empty message + bodyMessage.setBody(message); + break; + } + } + } catch (Exception e) { + exchange.setException(e); + } + return exchange; + } + + public static Message createMessage(Exchange exchange, Session session, SjmsEndpoint endpoint) throws Exception { + Message answer; + Object body; + Map<String, Object> bodyHeaders; + + if (exchange.getOut().getBody() != null) { + body = exchange.getOut().getBody(); + bodyHeaders = new HashMap<>(exchange.getOut().getHeaders()); + } else { + body = exchange.getIn().getBody(); + bodyHeaders = new HashMap<>(exchange.getIn().getHeaders()); + } + + answer = createMessage(session, body, bodyHeaders, endpoint); + return answer; + } + + public static Message createMessage(Session session, Object payload, Map<String, Object> messageHeaders, SjmsEndpoint endpoint) throws Exception { + return createMessage(session, payload, messageHeaders, endpoint.getJmsKeyFormatStrategy(), endpoint.getCamelContext().getTypeConverter()); + } + + private static Message createMessage(Session session, Object payload, Map<String, Object> messageHeaders, KeyFormatStrategy keyFormatStrategy, TypeConverter typeConverter) throws Exception { Message answer = null; JmsMessageType messageType = JmsMessageHelper.discoverMessageTypeFromPayload(payload); try { @@ -108,11 +170,9 @@ public final class JmsMessageHelper { break; case Map: MapMessage mapMessage = session.createMapMessage(); - Map<String, Object> objMap = (Map<String, Object>) payload; - Set<String> keys = objMap.keySet(); - for (String key : keys) { - Object value = objMap.get(key); - mapMessage.setObject(key, value); + Map objMap = (Map) payload; + for (final Map.Entry entry : (Set<Map.Entry>)objMap.entrySet()) { + mapMessage.setObject(entry.getKey().toString(), entry.getValue()); } answer = mapMessage; break; @@ -128,18 +188,12 @@ public final class JmsMessageHelper { answer = textMessage; break; case Stream: - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - InputStream is = typeConverter.convertTo(InputStream.class, payload); - int reads = is.read(); - while (reads != -1) { - baos.write(reads); - reads = is.read(); + StreamMessage streamMessage = session.createStreamMessage(); + Collection collection = (Collection)payload; + for (final Object obj : collection) { + streamMessage.writeObject(obj); } - BytesMessage bytesStreamMessage = session.createBytesMessage(); - bytesStreamMessage.writeBytes(baos.toByteArray()); - baos.close(); - is.close(); - answer = bytesStreamMessage; + answer = streamMessage; break; default: break; @@ -165,19 +219,11 @@ public final class JmsMessageHelper { * {@link DefaultJmsKeyFormatStrategy} will be used. * @return {@link Message} */ - public static Message setJmsMessageHeaders(final Message jmsMessage, Map<String, Object> messageHeaders, KeyFormatStrategy keyFormatStrategy) throws IllegalHeaderException { - // Support for the null keyFormatStrategy - KeyFormatStrategy localKeyFormatStrategy = null; - if (keyFormatStrategy == null) { - localKeyFormatStrategy = new DefaultJmsKeyFormatStrategy(); - } else { - localKeyFormatStrategy = keyFormatStrategy; - } - - Map<String, Object> headers = new HashMap<String, Object>(messageHeaders); - Set<String> keys = headers.keySet(); - for (String headerName : keys) { - Object headerValue = headers.get(headerName); + private static Message setJmsMessageHeaders(final Message jmsMessage, Map<String, Object> messageHeaders, KeyFormatStrategy keyFormatStrategy) throws IllegalHeaderException { + Map<String, Object> headers = new HashMap<>(messageHeaders); + for (final Map.Entry<String,Object> entry : headers.entrySet()) { + String headerName = entry.getKey(); + Object headerValue = entry.getValue(); if (headerName.equalsIgnoreCase(JMS_CORRELATION_ID)) { if (headerValue == null) { @@ -241,8 +287,8 @@ public final class JmsMessageHelper { } } else { LOGGER.trace("Ignoring JMS header: {} with value: {}", headerName, headerValue); - if (headerName.equalsIgnoreCase(JMS_DESTINATION) || headerName.equalsIgnoreCase(JMS_MESSAGE_ID) || headerName.equalsIgnoreCase("JMSTimestamp") - || headerName.equalsIgnoreCase("JMSRedelivered")) { + if (headerName.equalsIgnoreCase(JMS_DESTINATION) || headerName.equalsIgnoreCase(JMS_MESSAGE_ID) || headerName.equalsIgnoreCase(JMS_TIMESTAMP) + || headerName.equalsIgnoreCase(JMS_REDELIVERED)) { // The following properties are set by the // MessageProducer: // JMSDestination @@ -252,7 +298,7 @@ public final class JmsMessageHelper { LOGGER.trace("Ignoring JMS header: {} with value: {}", headerName, headerValue); } else { if (!(headerValue instanceof JmsMessageType)) { - String encodedName = localKeyFormatStrategy.encodeKey(headerName); + String encodedName = keyFormatStrategy.encodeKey(headerName); try { JmsMessageHelper.setProperty(jmsMessage, encodedName, headerValue); } catch (JMSException e) { @@ -260,12 +306,68 @@ public final class JmsMessageHelper { } } } - // } } } return jmsMessage; } + @SuppressWarnings("unchecked") + public static Exchange setJmsMessageHeaders(final Message jmsMessage, final Exchange exchange, boolean out, KeyFormatStrategy keyFormatStrategy) throws JMSException { + Map<String, Object> headers = new HashMap<>(); + if (jmsMessage != null) { + // lets populate the standard JMS message headers + try { + headers.put(JMS_CORRELATION_ID, jmsMessage.getJMSCorrelationID()); + headers.put(JMS_DELIVERY_MODE, jmsMessage.getJMSDeliveryMode()); + headers.put(JMS_DESTINATION, jmsMessage.getJMSDestination()); + headers.put(JMS_EXPIRATION, jmsMessage.getJMSExpiration()); + headers.put(JMS_MESSAGE_ID, jmsMessage.getJMSMessageID()); + headers.put(JMS_PRIORITY, jmsMessage.getJMSPriority()); + headers.put(JMS_REDELIVERED, jmsMessage.getJMSRedelivered()); + headers.put(JMS_TIMESTAMP, jmsMessage.getJMSTimestamp()); + headers.put(JMS_REPLY_TO, getJMSReplyTo(jmsMessage)); + headers.put(JMS_TYPE, getJMSType(jmsMessage)); + + // this works around a bug in the ActiveMQ property handling + headers.put(JMSX_GROUP_ID, jmsMessage.getStringProperty(JMSX_GROUP_ID)); + } catch (JMSException e) { + throw new RuntimeCamelException(e); + } + + for (final Enumeration<String> enumeration = jmsMessage.getPropertyNames(); enumeration.hasMoreElements();) { + String key = enumeration.nextElement(); + if (hasIllegalHeaderKey(key)) { + throw new IllegalHeaderException("Header " + key + " is not a legal JMS header name value"); + } + Object value = jmsMessage.getObjectProperty(key); + String decodedName = keyFormatStrategy.decodeKey(key); + headers.put(decodedName, value); + } + } + if (out) { + exchange.getOut().setHeaders(headers); + } else { + exchange.getIn().setHeaders(headers); + } + return exchange; + } + + /** + * Gets the JMSType from the message. + * + * @param message the message + * @return the type, can be <tt>null</tt> + */ + public static String getJMSType(Message message) { + try { + return message.getJMSType(); + } catch (Exception e) { + // ignore due OracleAQ does not support accessing JMSType + } + + return null; + } + /** * Sets the JMSDeliveryMode on the message. * @@ -278,9 +380,9 @@ public final class JmsMessageHelper { if (deliveryMode instanceof String) { String s = (String) deliveryMode; - if ("PERSISTENT".equalsIgnoreCase(s)) { + if (JMS_DELIVERY_MODE_PERSISTENT.equalsIgnoreCase(s)) { mode = DeliveryMode.PERSISTENT; - } else if ("NON_PERSISTENT".equalsIgnoreCase(s)) { + } else if (JMS_DELIVERY_MODE_NON_PERSISTENT.equalsIgnoreCase(s)) { mode = DeliveryMode.NON_PERSISTENT; } else { // it may be a number in the String so try that @@ -339,6 +441,35 @@ public final class JmsMessageHelper { } /** + * Sets the JMSReplyTo on the message. + * + * @param message the message + * @param replyTo the reply to destination + */ + public static void setJMSReplyTo(Message message, Destination replyTo) { + try { + message.setJMSReplyTo(replyTo); + } catch (Exception e) { + LOGGER.debug("Error setting the correlationId: {}", replyTo.toString()); + } + } + + /** + * Gets the JMSReplyTo from the message. + * + * @param message the message + * @return the reply to, can be <tt>null</tt> + */ + public static Destination getJMSReplyTo(Message message) { + try { + return message.getJMSReplyTo(); + } catch (Exception e) { + // ignore due OracleAQ does not support accessing JMSReplyTo + } + return null; + } + + /** * Sets the property on the given JMS message. * * @param jmsMessage the JMS message @@ -348,9 +479,8 @@ public final class JmsMessageHelper { */ public static void setProperty(Message jmsMessage, String name, Object value) throws JMSException { if (value == null) { - return; - } - if (value instanceof Byte) { + jmsMessage.setObjectProperty(name, null); + } else if (value instanceof Byte) { jmsMessage.setByteProperty(name, (Byte) value); } else if (value instanceof Boolean) { jmsMessage.setBooleanProperty(name, (Boolean) value); @@ -373,27 +503,31 @@ public final class JmsMessageHelper { } public static JmsMessageType discoverMessageTypeFromPayload(final Object payload) { - JmsMessageType answer = null; + JmsMessageType answer; // Default is a JMS Message since a body is not required if (payload == null) { answer = JmsMessageType.Message; } else { // Something was found in the body so determine // what type of message we need to create - if (Byte[].class.isInstance(payload)) { + if (byte[].class.isInstance(payload)) { answer = JmsMessageType.Bytes; - } else if (Collection.class.isInstance(payload)) { + } else if (Map.class.isInstance(payload)) { answer = JmsMessageType.Map; - } else if (InputStream.class.isInstance(payload)) { + } else if (Collection.class.isInstance(payload)) { answer = JmsMessageType.Stream; + } else if (InputStream.class.isInstance(payload)) { + answer = JmsMessageType.Bytes; } else if (ByteBuffer.class.isInstance(payload)) { answer = JmsMessageType.Bytes; } else if (File.class.isInstance(payload)) { answer = JmsMessageType.Bytes; } else if (Reader.class.isInstance(payload)) { - answer = JmsMessageType.Bytes; + answer = JmsMessageType.Text; } else if (String.class.isInstance(payload)) { answer = JmsMessageType.Text; + } else if (CharBuffer.class.isInstance(payload)) { + answer = JmsMessageType.Text; } else if (char[].class.isInstance(payload)) { answer = JmsMessageType.Text; } else if (Character.class.isInstance(payload)) { @@ -408,7 +542,7 @@ public final class JmsMessageHelper { } public static JmsMessageType discoverJmsMessageType(Message message) { - JmsMessageType answer = null; + JmsMessageType answer; if (message != null) { if (BytesMessage.class.isInstance(message)) { answer = JmsMessageType.Bytes; @@ -428,4 +562,9 @@ public final class JmsMessageHelper { } return answer; } -} + + private static boolean hasIllegalHeaderKey(String key) { + return key == null || key.isEmpty() || key.contains(".") || key.contains("-"); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/1dc631ee/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java index 986a7bd..a1fbd7a 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java @@ -85,7 +85,7 @@ public class InOnlyProducer extends SjmsProducer { @Override public void sendMessage(final Exchange exchange, final AsyncCallback callback, final MessageProducerResources producer) throws Exception { try { - Collection<Message> messages = new ArrayList<Message>(1); + Collection<Message> messages = new ArrayList<>(1); if (exchange.getIn().getBody() != null) { if (exchange.getIn().getBody() instanceof List) { Iterable<?> payload = (Iterable<?>) exchange.getIn().getBody(); @@ -93,19 +93,15 @@ public class InOnlyProducer extends SjmsProducer { Message message; if (BatchMessage.class.isInstance(object)) { BatchMessage<?> batchMessage = (BatchMessage<?>) object; - message = JmsMessageHelper.createMessage(producer.getSession(), batchMessage.getPayload(), batchMessage.getHeaders(), getSjmsEndpoint() - .getJmsKeyFormatStrategy(), getSjmsEndpoint().getCamelContext().getTypeConverter()); + message = JmsMessageHelper.createMessage(producer.getSession(), batchMessage.getPayload(), batchMessage.getHeaders(), getEndpoint()); } else { - message = JmsMessageHelper.createMessage(producer.getSession(), object, exchange.getIn().getHeaders(), - getSjmsEndpoint().getJmsKeyFormatStrategy(), getSjmsEndpoint().getCamelContext().getTypeConverter()); + message = JmsMessageHelper.createMessage(producer.getSession(), object, exchange.getIn().getHeaders(), getEndpoint()); } messages.add(message); } } else { Object payload = exchange.getIn().getBody(); - Message message = JmsMessageHelper - .createMessage(producer.getSession(), payload, exchange.getIn().getHeaders(), getSjmsEndpoint().getJmsKeyFormatStrategy(), - getSjmsEndpoint().getCamelContext().getTypeConverter()); + Message message = JmsMessageHelper.createMessage(producer.getSession(), payload, exchange.getIn().getHeaders(), getEndpoint()); messages.add(message); } } http://git-wip-us.apache.org/repos/asf/camel/blob/1dc631ee/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java index be336b3..55f7019 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java @@ -19,12 +19,11 @@ package org.apache.camel.component.sjms.producer; import java.util.Map; import java.util.TreeMap; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Exchanger; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; import javax.jms.Connection; import javax.jms.Destination; import javax.jms.Message; @@ -39,8 +38,9 @@ import org.apache.camel.Exchange; import org.apache.camel.component.sjms.MessageConsumerResources; import org.apache.camel.component.sjms.MessageProducerResources; import org.apache.camel.component.sjms.SjmsEndpoint; -import org.apache.camel.component.sjms.SjmsExchangeMessageHelper; import org.apache.camel.component.sjms.SjmsProducer; +import org.apache.camel.component.sjms.jms.JmsConstants; +import org.apache.camel.component.sjms.jms.JmsMessageHelper; import org.apache.camel.component.sjms.jms.JmsObjectFactory; import org.apache.camel.component.sjms.tx.SessionTransactionSynchronization; import org.apache.camel.util.ObjectHelper; @@ -52,12 +52,7 @@ import org.apache.commons.pool.impl.GenericObjectPool; */ public class InOutProducer extends SjmsProducer { - /** - * We use the {@link ReadWriteLock} to manage the {@link TreeMap} in place - * of a {@link ConcurrentMap} because due to significant performance gains. - */ - private static Map<String, Exchanger<Object>> exchangerMap = new TreeMap<String, Exchanger<Object>>(); - private ReadWriteLock lock = new ReentrantReadWriteLock(); + private static final Map<String, Exchanger<Object>> EXCHANGERS = new ConcurrentHashMap<>(); /** * A pool of {@link MessageConsumerResources} objects that are the reply @@ -87,16 +82,14 @@ public class InOutProducer extends SjmsProducer { messageConsumer.setMessageListener(new MessageListener() { @Override - public void onMessage(Message message) { - if (log.isDebugEnabled()) { - log.debug("Message Received in the Consumer Pool"); - log.debug(" Message : {}", message); - } + public void onMessage(final Message message) { + log.debug("Message Received in the Consumer Pool"); + log.debug(" Message : {}", message); try { - Exchanger<Object> exchanger = exchangerMap.get(message.getJMSCorrelationID()); + Exchanger<Object> exchanger = EXCHANGERS.get(message.getJMSCorrelationID()); exchanger.exchange(message, getResponseTimeOut(), TimeUnit.MILLISECONDS); } catch (Exception e) { - ObjectHelper.wrapRuntimeCamelException(e); + log.error("Unable to exchange message: {}", message, e); } } @@ -132,9 +125,8 @@ public class InOutProducer extends SjmsProducer { private GenericObjectPool<MessageConsumerResources> consumers; - public InOutProducer(SjmsEndpoint endpoint) { + public InOutProducer(final SjmsEndpoint endpoint) { super(endpoint); - endpoint.getConsumerCount(); } @Override @@ -195,28 +187,23 @@ public class InOutProducer extends SjmsProducer { exchange.getUnitOfWork().addSynchronization(new SessionTransactionSynchronization(producer.getSession(), getCommitStrategy())); } - Message request = SjmsExchangeMessageHelper.createMessage(exchange, producer.getSession(), getSjmsEndpoint().getJmsKeyFormatStrategy(), getSjmsEndpoint().getCamelContext().getTypeConverter()); + Message request = JmsMessageHelper.createMessage(exchange, producer.getSession(), getEndpoint()); // TODO just set the correlation id don't get it from the // message - String correlationId = null; - if (exchange.getIn().getHeader("JMSCorrelationID", String.class) == null) { + String correlationId; + if (exchange.getIn().getHeader(JmsConstants.JMS_CORRELATION_ID, String.class) == null) { correlationId = UUID.randomUUID().toString().replace("-", ""); } else { - correlationId = exchange.getIn().getHeader("JMSCorrelationID", String.class); + correlationId = exchange.getIn().getHeader(JmsConstants.JMS_CORRELATION_ID, String.class); } Object responseObject = null; - Exchanger<Object> messageExchanger = new Exchanger<Object>(); - SjmsExchangeMessageHelper.setCorrelationId(request, correlationId); - lock.writeLock().lock(); - try { - exchangerMap.put(correlationId, messageExchanger); - } finally { - lock.writeLock().unlock(); - } + Exchanger<Object> messageExchanger = new Exchanger<>(); + JmsMessageHelper.setCorrelationId(request, correlationId); + EXCHANGERS.put(correlationId, messageExchanger); MessageConsumerResources consumer = consumers.borrowObject(); - SjmsExchangeMessageHelper.setJMSReplyTo(request, consumer.getReplyToDestination()); + JmsMessageHelper.setJMSReplyTo(request, consumer.getReplyToDestination()); consumers.returnObject(consumer); producer.getMessageProducer().send(request); @@ -231,13 +218,7 @@ public class InOutProducer extends SjmsProducer { try { responseObject = messageExchanger.exchange(null, getResponseTimeOut(), TimeUnit.MILLISECONDS); - - lock.writeLock().lock(); - try { - exchangerMap.remove(correlationId); - } finally { - lock.writeLock().unlock(); - } + EXCHANGERS.remove(correlationId); } catch (InterruptedException e) { log.debug("Exchanger was interrupted while waiting on response", e); exchange.setException(e); @@ -251,7 +232,7 @@ public class InOutProducer extends SjmsProducer { exchange.setException((Throwable) responseObject); } else if (responseObject instanceof Message) { Message response = (Message) responseObject; - SjmsExchangeMessageHelper.populateExchange(response, exchange, true, getEndpoint().getJmsKeyFormatStrategy()); + JmsMessageHelper.populateExchange(response, exchange, true, getEndpoint().getJmsKeyFormatStrategy()); } else { exchange.setException(new CamelException("Unknown response type: " + responseObject)); } http://git-wip-us.apache.org/repos/asf/camel/blob/1dc631ee/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/typeconversion/JMSMessageHelperTypeConversionTest.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/typeconversion/JMSMessageHelperTypeConversionTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/typeconversion/JMSMessageHelperTypeConversionTest.java index ab5d0ef..6a228b0 100644 --- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/typeconversion/JMSMessageHelperTypeConversionTest.java +++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/typeconversion/JMSMessageHelperTypeConversionTest.java @@ -28,11 +28,15 @@ import java.io.InputStream; import java.io.Reader; import java.io.StringReader; import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.util.HashMap; +import java.util.Map; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.sjms.support.JmsTestSupport; +import org.junit.Ignore; import org.junit.Test; public class JMSMessageHelperTypeConversionTest extends JmsTestSupport { @@ -49,10 +53,45 @@ public class JMSMessageHelperTypeConversionTest extends JmsTestSupport { assertMockEndpointsSatisfied(); assertTrue(String.class.isInstance(message.getIn().getBody())); } - + + @Test + public void testJMSMessageHelperMap() throws Exception { + getMockEndpoint(MOCK_RESULT_URI).expectedMessageCount(1); + + Map<Object,Object> map = new HashMap<>(); + map.put("Hello", "Camel"); + map.put("Int", Integer.MAX_VALUE); + map.put("Boolean", Boolean.TRUE); + map.put(Boolean.TRUE, Long.MAX_VALUE); + + template.sendBody(SJMS_QUEUE_URI, map); + assertMockEndpointsSatisfied(); + assertTrue(Map.class.isInstance(message.getIn().getBody())); + assertEquals("Camel", message.getIn().getBody(Map.class).get("Hello")); + assertEquals(Integer.MAX_VALUE, message.getIn().getBody(Map.class).get("Int")); + assertEquals(Boolean.TRUE, message.getIn().getBody(Map.class).get("Boolean")); + assertEquals(Long.MAX_VALUE, message.getIn().getBody(Map.class).get("true")); + } + + @Ignore + @Test + public void testJMSMessageHelperCollection() throws Exception { + // TODO: Once SJMS can accept a Collection as Body + } + + @Test + public void testJMSMessageHelperByteArray() throws Exception { + getMockEndpoint(MOCK_RESULT_URI).expectedBodiesReceived("Hello Camel".getBytes()); + + byte[] bytes = "Hello Camel".getBytes(); + template.sendBody(SJMS_QUEUE_URI, bytes); + assertMockEndpointsSatisfied(); + assertTrue(byte[].class.isInstance(message.getIn().getBody())); + } + @Test public void testJMSMessageHelperInputStream() throws Exception { - getMockEndpoint(MOCK_RESULT_URI).expectedBodiesReceived("Hello Camel"); + getMockEndpoint(MOCK_RESULT_URI).expectedBodiesReceived("Hello Camel".getBytes()); String p = "Hello Camel"; InputStream is = new ByteArrayInputStream(p.getBytes()); template.sendBody(SJMS_QUEUE_URI, is); @@ -61,10 +100,19 @@ public class JMSMessageHelperTypeConversionTest extends JmsTestSupport { } @Test - public void testJMSMessageHelperByteBuffer() throws Exception { + public void testJMSMessageHelperCharBuffer() throws Exception { getMockEndpoint(MOCK_RESULT_URI).expectedBodiesReceived("Hello Camel"); + CharBuffer cb = CharBuffer.wrap("Hello Camel"); + template.sendBody(SJMS_QUEUE_URI, cb); + assertMockEndpointsSatisfied(); + assertTrue(String.class.isInstance(message.getIn().getBody())); + } + + @Test + public void testJMSMessageHelperByteBuffer() throws Exception { + getMockEndpoint(MOCK_RESULT_URI).expectedBodiesReceived("Hello Camel".getBytes()); String p = "Hello Camel"; - ByteBuffer bb = ByteBuffer.wrap(p.getBytes()); + ByteBuffer bb = ByteBuffer.wrap(p.getBytes()); template.sendBody(SJMS_QUEUE_URI, bb); assertMockEndpointsSatisfied(); assertTrue(byte[].class.isInstance(message.getIn().getBody())); @@ -72,7 +120,7 @@ public class JMSMessageHelperTypeConversionTest extends JmsTestSupport { @Test public void testJMSMessageHelperFile() throws InterruptedException, IOException { - getMockEndpoint(MOCK_RESULT_URI).expectedBodiesReceived("Hello Camel"); + getMockEndpoint(MOCK_RESULT_URI).expectedBodiesReceived("Hello Camel".getBytes()); String p = "Hello Camel"; File f = File.createTempFile("tmp-test", ".txt"); BufferedWriter bw = new BufferedWriter(new FileWriter(f)); @@ -96,9 +144,8 @@ public class JMSMessageHelperTypeConversionTest extends JmsTestSupport { Reader test = new BufferedReader(new FileReader(f.getAbsolutePath())); template.sendBody(SJMS_QUEUE_URI, test); assertMockEndpointsSatisfied(); - boolean resultDelete = f.delete(); - assertTrue(resultDelete); - assertTrue(byte[].class.isInstance(message.getIn().getBody())); + assertTrue(f.delete()); + assertTrue(String.class.isInstance(message.getIn().getBody())); } @Test @@ -108,7 +155,7 @@ public class JMSMessageHelperTypeConversionTest extends JmsTestSupport { StringReader test = new StringReader(p); template.sendBody(SJMS_QUEUE_URI, test); assertMockEndpointsSatisfied(); - assertTrue(byte[].class.isInstance(message.getIn().getBody())); + assertTrue(String.class.isInstance(message.getIn().getBody())); } @Test @@ -143,7 +190,7 @@ public class JMSMessageHelperTypeConversionTest extends JmsTestSupport { public void configure() throws Exception { interceptSendToEndpoint(MOCK_RESULT_URI).process(new Processor() { public void process(Exchange exchange) throws Exception { - message = (Exchange) exchange; + message = exchange; } });