Author: sully6768 Date: Tue Oct 30 19:46:53 2012 New Revision: 1403841 URL: http://svn.apache.org/viewvc?rev=1403841&view=rev Log: Added support for StreamMessage
Modified: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsExchangeMessageHelper.java camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOutMessageHandler.java camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageType.java camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java Modified: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsExchangeMessageHelper.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsExchangeMessageHelper.java?rev=1403841&r1=1403840&r2=1403841&view=diff ============================================================================== --- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsExchangeMessageHelper.java (original) +++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsExchangeMessageHelper.java Tue Oct 30 19:46:53 2012 @@ -16,8 +16,7 @@ */ package org.apache.camel.component.sjms; -import java.io.Serializable; -import java.util.Collection; +import java.io.ByteArrayOutputStream; import java.util.Enumeration; import java.util.HashMap; import java.util.LinkedHashMap; @@ -33,6 +32,7 @@ import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.ObjectMessage; import javax.jms.Session; +import javax.jms.StreamMessage; import javax.jms.TextMessage; import org.apache.camel.Endpoint; @@ -42,7 +42,9 @@ import org.apache.camel.component.sjms.j import org.apache.camel.component.sjms.jms.IllegalHeaderException; import org.apache.camel.component.sjms.jms.JmsConstants; import org.apache.camel.component.sjms.jms.JmsMessageHeaderType; +import org.apache.camel.component.sjms.jms.JmsMessageHelper; import org.apache.camel.component.sjms.jms.JmsMessageType; +import org.apache.camel.component.sjms.jms.KeyFormatStrategy; import org.apache.camel.impl.DefaultMessage; import org.apache.camel.util.ExchangeHelper; import org.apache.camel.util.ObjectHelper; @@ -83,7 +85,7 @@ public final class SjmsExchangeMessageHe } else { bodyMessage = (DefaultMessage)exchange.getIn(); } - switch (SjmsExchangeMessageHelper.discoverType(message)) { + switch (JmsMessageHelper.discoverJmsMessageType(message)) { case Bytes: BytesMessage bytesMessage = (BytesMessage)message; if (bytesMessage.getBodyLength() > Integer.MAX_VALUE) { @@ -117,6 +119,18 @@ public final class SjmsExchangeMessageHe bodyMessage.setHeader(JMS_MESSAGE_TYPE, JmsMessageType.Text); bodyMessage.setBody(textMsg.getText()); break; + case Stream: + StreamMessage streamMessage = (StreamMessage)message; + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + int next = streamMessage.readByte(); + while (next > -1) { + baos.write(next); + next = streamMessage.readByte(); + } + baos.flush(); + bodyMessage.setHeader(JMS_MESSAGE_TYPE, JmsMessageType.Bytes); + bodyMessage.setBody(baos.toByteArray()); + break; case Message: default: // Do nothing. Only set the headers for an empty message @@ -417,47 +431,6 @@ public final class SjmsExchangeMessageHe return jmsMessage; } - public static JmsMessageType discoverType(Message value) throws Exception { - JmsMessageType answer = null; - if (value != null) { - if (Message.class.isInstance(value)) { - if (BytesMessage.class.isInstance(value)) { - answer = JmsMessageType.Bytes; - } else if (MapMessage.class.isInstance(value)) { - answer = JmsMessageType.Map; - } else if (TextMessage.class.isInstance(value)) { - answer = JmsMessageType.Text; - } else if (ObjectMessage.class.isInstance(value)) { - answer = JmsMessageType.Object; - } else { - answer = JmsMessageType.Message; - } - } - } - return answer; - } - - public static JmsMessageType discoverType(final Exchange exchange) { - JmsMessageType answer = (JmsMessageType)exchange.getIn().getHeader(JMS_MESSAGE_TYPE); - if (answer == null) { - final Object value = exchange.getIn().getBody(); - if (value != null) { - if (Byte[].class.isInstance(value)) { - answer = JmsMessageType.Bytes; - } else if (Collection.class.isInstance(value)) { - answer = JmsMessageType.Map; - } else if (String.class.isInstance(value)) { - answer = JmsMessageType.Text; - } else if (Serializable.class.isInstance(value)) { - answer = JmsMessageType.Object; - } else { - answer = JmsMessageType.Message; - } - } - } - return answer; - } - @SuppressWarnings("unchecked") public static Exchange setJmsMessageHeaders(final Message jmsMessage, final Exchange exchange, boolean out) throws JMSException { HashMap<String, Object> headers = new HashMap<String, Object>(); @@ -497,59 +470,23 @@ public final class SjmsExchangeMessageHe } return exchange; } - - public static Message createMessage(Exchange exchange, Session session) throws Exception { - return createMessage(exchange, session, false); - } - - @SuppressWarnings("unchecked") - public static Message createMessage(Exchange exchange, Session session, boolean out) throws Exception { + + public static Message createMessage(Exchange exchange, Session session, KeyFormatStrategy keyFormatStrategy) throws Exception { Message answer = null; Object body = null; - try { - if (out && exchange.getOut().getBody() != null) { - body = exchange.getOut().getBody(); - } else { - body = exchange.getIn().getBody(); - } - JmsMessageType messageType = SjmsExchangeMessageHelper.discoverType(exchange); + Map<String, Object> bodyHeaders = null; - switch (messageType) { - case Bytes: - BytesMessage bytesMessage = session.createBytesMessage(); - bytesMessage.writeBytes((byte[])body); - answer = bytesMessage; - break; - case Map: - MapMessage mapMessage = session.createMapMessage(); - Map<String, Object> objMap = (Map<String, Object>)body; - Set<String> keys = objMap.keySet(); - for (String key : keys) { - Object value = objMap.get(key); - mapMessage.setObject(key, value); - } - answer = mapMessage; - break; - case Object: - ObjectMessage objectMessage = session.createObjectMessage(); - objectMessage.setObject((Serializable)body); - answer = objectMessage; - break; - case Text: - TextMessage textMessage = session.createTextMessage(); - textMessage.setText((String)body); - answer = textMessage; - break; - default: - answer = session.createMessage(); - break; - } - } catch (Exception e) { - LOGGER.error("TODO Auto-generated catch block", e); - throw e; + + if (exchange.getOut().getBody() != null) { + body = exchange.getOut().getBody(); + bodyHeaders = new HashMap<String, Object>(exchange.getOut().getHeaders()); + } else { + body = exchange.getIn().getBody(); + bodyHeaders = new HashMap<String, Object>(exchange.getIn().getHeaders()); } - - answer = SjmsExchangeMessageHelper.setJmsMessageHeaders(exchange, answer); + + answer = JmsMessageHelper.createMessage(session, body, bodyHeaders, keyFormatStrategy); + return answer; } Modified: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java?rev=1403841&r1=1403840&r2=1403841&view=diff ============================================================================== --- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java (original) +++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java Tue Oct 30 19:46:53 2012 @@ -16,10 +16,6 @@ */ package org.apache.camel.component.sjms; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; import java.util.concurrent.ExecutorService; import javax.jms.MessageProducer; @@ -159,19 +155,15 @@ public abstract class SjmsProducer exten @Override public boolean process(final Exchange exchange, final AsyncCallback callback) { - log.debug("Processing Exchange.id:{}", exchange.getExchangeId()); - - Object body = exchange.getIn().getBody(); - if (body != null) { - if (body instanceof InputStream) { - byte[] bytes = exchange.getContext().getTypeConverter().convertTo(byte[].class, body); - exchange.getIn().setBody(bytes); - } + if (log.isDebugEnabled()) { + log.debug("Processing Exchange.id:{}", exchange.getExchangeId()); } try { if (!isSynchronous()) { - log.debug(" Sending message asynchronously: {}", exchange.getIn().getBody()); + if (log.isDebugEnabled()) { + log.debug(" Sending message asynchronously: {}", exchange.getIn().getBody()); + } getExecutor().execute(new Runnable() { @Override public void run() { @@ -183,12 +175,18 @@ public abstract class SjmsProducer exten } }); } else { - log.debug(" Sending message synchronously: {}", exchange.getIn().getBody()); + if (log.isDebugEnabled()) { + log.debug(" Sending message synchronously: {}", exchange.getIn().getBody()); + } sendMessage(exchange, callback); } } catch (Exception e) { - log.debug("Processing Exchange.id:{}", exchange.getExchangeId() + " - FAILED"); - log.trace("Exception: " + e.getLocalizedMessage(), e); + if (log.isDebugEnabled()) { + log.debug("Processing Exchange.id:{}", exchange.getExchangeId() + " - FAILED"); + } + if (log.isDebugEnabled()) { + log.trace("Exception: " + e.getLocalizedMessage(), e); + } exchange.setException(e); } log.debug("Processing Exchange.id:{}", exchange.getExchangeId() + " - SUCCESS"); @@ -196,45 +194,6 @@ public abstract class SjmsProducer exten return isSynchronous(); } - public static byte[] getBytes(InputStream is) throws IOException { - int len; - int size = 1024; - byte[] buf; - - if (is instanceof ByteArrayInputStream) { - size = is.available(); - buf = new byte[size]; - len = is.read(buf, 0, size); - } else { - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - buf = new byte[size]; - while ((len = is.read(buf, 0, size)) != -1) { - bos.write(buf, 0, len); - } - buf = bos.toByteArray(); - } - return buf; - } - -// public static byte[] getBytesFromStream(InputStream is) throws IOException { -// BufferedInputStream bis = new BufferedInputStream(is); -// bis.available(); -// -// // Create the byte array to hold the data -// byte[] bytes = new byte[(int)bis.available()]; -// -// // Read in the bytes -// int offset = 0; -// int numRead = 0; -// while (offset < bytes.length && (numRead = is.read(bytes, offset, bytes.length - offset)) >= 0) { -// offset += numRead; -// } -// -// // Close the input stream and return bytes -// is.close(); -// return bytes; -// } - protected SjmsEndpoint getSjmsEndpoint() { return (SjmsEndpoint)this.getEndpoint(); Modified: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOutMessageHandler.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOutMessageHandler.java?rev=1403841&r1=1403840&r2=1403841&view=diff ============================================================================== --- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOutMessageHandler.java (original) +++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOutMessageHandler.java Tue Oct 30 19:46:53 2012 @@ -16,7 +16,6 @@ */ package org.apache.camel.component.sjms.consumer; -import java.io.InputStream; import java.util.Map; import java.util.TreeMap; import java.util.concurrent.ExecutorService; @@ -33,7 +32,9 @@ import javax.jms.Topic; import org.apache.camel.AsyncCallback; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; +import org.apache.camel.component.sjms.SjmsEndpoint; import org.apache.camel.component.sjms.SjmsExchangeMessageHelper; +import org.apache.camel.component.sjms.jms.JmsMessageHelper; import org.apache.camel.component.sjms.jms.JmsObjectFactory; import org.apache.camel.spi.Synchronization; import org.apache.camel.util.AsyncProcessorHelper; @@ -77,7 +78,7 @@ public class InOutMessageHandler extends public void handleMessage(final Exchange exchange) { try { MessageProducer messageProducer = null; - Object obj = exchange.getIn().getHeader("JMSReplyTo"); + Object obj = exchange.getIn().getHeader(JmsMessageHelper.JMS_REPLY_TO); if (obj != null) { Destination replyTo = null; if (isDestination(obj)) { @@ -185,14 +186,7 @@ public class InOutMessageHandler extends public void done(boolean sync) { try { - Object body = exchange.getOut().getBody(); - if (body != null) { - if (body instanceof InputStream) { - byte[] bytes = exchange.getContext().getTypeConverter().convertTo(byte[].class, body); - exchange.getOut().setBody(bytes); - } - } - Message response = SjmsExchangeMessageHelper.createMessage(exchange, getSession(), true); + Message response = SjmsExchangeMessageHelper.createMessage(exchange, getSession(), ((SjmsEndpoint)getEndpoint()).getJmsKeyFormatStrategy()); response.setJMSCorrelationID(exchange.getIn().getHeader("JMSCorrelationID", String.class)); localProducer.send(response); } catch (Exception e) { Modified: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java?rev=1403841&r1=1403840&r2=1403841&view=diff ============================================================================== --- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java (original) +++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java Tue Oct 30 19:46:53 2012 @@ -16,6 +16,7 @@ */ package org.apache.camel.component.sjms.jms; +import java.io.InputStream; import java.io.Serializable; import java.util.Collection; import java.util.HashMap; @@ -30,6 +31,7 @@ import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.ObjectMessage; import javax.jms.Session; +import javax.jms.StreamMessage; import javax.jms.TextMessage; import org.slf4j.Logger; @@ -91,7 +93,7 @@ public final class JmsMessageHelper { @SuppressWarnings("unchecked") public static Message createMessage(Session session, Object payload, Map<String, Object> messageHeaders, KeyFormatStrategy keyFormatStrategy) throws Exception { Message answer = null; - JmsMessageType messageType = JmsMessageHelper.discoverPayloadType(payload); + JmsMessageType messageType = JmsMessageHelper.discoverMessgeTypeFromPayload(payload); try { switch (messageType) { @@ -360,13 +362,20 @@ public final class JmsMessageHelper { } } - public static JmsMessageType discoverPayloadType(Object payload) { + public static JmsMessageType discoverMessgeTypeFromPayload(final Object payload) { JmsMessageType answer = null; - if (payload != null) { + // Default is a JMS Message since a body is not required + if (payload == null) { + answer = JmsMessageType.Message; + } else { + // Something was found in the body so determine + // what type of message we need to create if (Byte[].class.isInstance(payload)) { answer = JmsMessageType.Bytes; } else if (Collection.class.isInstance(payload)) { answer = JmsMessageType.Map; + } else if (InputStream.class.isInstance(payload)) { + answer = JmsMessageType.Stream; } else if (String.class.isInstance(payload)) { answer = JmsMessageType.Text; } else if (Serializable.class.isInstance(payload)) { @@ -374,6 +383,27 @@ public final class JmsMessageHelper { } else { answer = JmsMessageType.Message; } + } + + return answer; + } + + public static JmsMessageType discoverJmsMessageType(Message message) { + JmsMessageType answer = null; + if (message != null) { + if (BytesMessage.class.isInstance(message)) { + answer = JmsMessageType.Bytes; + } else if (MapMessage.class.isInstance(message)) { + answer = JmsMessageType.Map; + } else if (TextMessage.class.isInstance(message)) { + answer = JmsMessageType.Text; + } else if (StreamMessage.class.isInstance(message)) { + answer = JmsMessageType.Stream; + } else if (ObjectMessage.class.isInstance(message)) { + answer = JmsMessageType.Object; + } else { + answer = JmsMessageType.Message; + } } else { answer = JmsMessageType.Message; } Modified: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageType.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageType.java?rev=1403841&r1=1403840&r2=1403841&view=diff ============================================================================== --- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageType.java (original) +++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageType.java Tue Oct 30 19:46:53 2012 @@ -29,10 +29,7 @@ public enum JmsMessageType { Bytes, Map, Object, - /** - * TODO Write support for Stream Messages - */ - //Stream, + Stream, Text, /** Modified: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java?rev=1403841&r1=1403840&r2=1403841&view=diff ============================================================================== --- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java (original) +++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java Tue Oct 30 19:46:53 2012 @@ -271,8 +271,8 @@ public class InOutProducer extends SjmsP if (isEndpointTransacted()) { exchange.getUnitOfWork().addSynchronization(new SessionTransactionSynchronization(producer.getSession(), getCommitStrategy())); } - - Message request = SjmsExchangeMessageHelper.createMessage(exchange, producer.getSession()); + + Message request = SjmsExchangeMessageHelper.createMessage(exchange, producer.getSession(), getSjmsEndpoint().getJmsKeyFormatStrategy()); // TODO just set the correlation id don't get it from the // message