Author: sully6768
Date: Tue Oct 30 19:46:53 2012
New Revision: 1403841

URL: http://svn.apache.org/viewvc?rev=1403841&view=rev
Log:
Added support for StreamMessage

Modified:
    
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsExchangeMessageHelper.java
    
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
    
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOutMessageHandler.java
    
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java
    
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageType.java
    
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java

Modified: 
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsExchangeMessageHelper.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsExchangeMessageHelper.java?rev=1403841&r1=1403840&r2=1403841&view=diff
==============================================================================
--- 
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsExchangeMessageHelper.java
 (original)
+++ 
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsExchangeMessageHelper.java
 Tue Oct 30 19:46:53 2012
@@ -16,8 +16,7 @@
  */
 package org.apache.camel.component.sjms;
 
-import java.io.Serializable;
-import java.util.Collection;
+import java.io.ByteArrayOutputStream;
 import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
@@ -33,6 +32,7 @@ import javax.jms.MapMessage;
 import javax.jms.Message;
 import javax.jms.ObjectMessage;
 import javax.jms.Session;
+import javax.jms.StreamMessage;
 import javax.jms.TextMessage;
 
 import org.apache.camel.Endpoint;
@@ -42,7 +42,9 @@ import org.apache.camel.component.sjms.j
 import org.apache.camel.component.sjms.jms.IllegalHeaderException;
 import org.apache.camel.component.sjms.jms.JmsConstants;
 import org.apache.camel.component.sjms.jms.JmsMessageHeaderType;
+import org.apache.camel.component.sjms.jms.JmsMessageHelper;
 import org.apache.camel.component.sjms.jms.JmsMessageType;
+import org.apache.camel.component.sjms.jms.KeyFormatStrategy;
 import org.apache.camel.impl.DefaultMessage;
 import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.util.ObjectHelper;
@@ -83,7 +85,7 @@ public final class SjmsExchangeMessageHe
                 } else {
                     bodyMessage = (DefaultMessage)exchange.getIn();
                 }
-                switch (SjmsExchangeMessageHelper.discoverType(message)) {
+                switch (JmsMessageHelper.discoverJmsMessageType(message)) {
                 case Bytes:
                     BytesMessage bytesMessage = (BytesMessage)message;
                     if (bytesMessage.getBodyLength() > Integer.MAX_VALUE) {
@@ -117,6 +119,18 @@ public final class SjmsExchangeMessageHe
                     bodyMessage.setHeader(JMS_MESSAGE_TYPE, 
JmsMessageType.Text);
                     bodyMessage.setBody(textMsg.getText());
                     break;
+                case Stream:
+                    StreamMessage streamMessage = (StreamMessage)message;
+                    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                    int next = streamMessage.readByte();
+                    while (next > -1) {
+                        baos.write(next);
+                        next = streamMessage.readByte();
+                    }
+                    baos.flush();
+                    bodyMessage.setHeader(JMS_MESSAGE_TYPE, 
JmsMessageType.Bytes);
+                    bodyMessage.setBody(baos.toByteArray());
+                    break;
                 case Message:
                 default:
                     // Do nothing. Only set the headers for an empty message
@@ -417,47 +431,6 @@ public final class SjmsExchangeMessageHe
         return jmsMessage;
     }
 
-    public static JmsMessageType discoverType(Message value) throws Exception {
-        JmsMessageType answer = null;
-        if (value != null) {
-            if (Message.class.isInstance(value)) {
-                if (BytesMessage.class.isInstance(value)) {
-                    answer = JmsMessageType.Bytes;
-                } else if (MapMessage.class.isInstance(value)) {
-                    answer = JmsMessageType.Map;
-                } else if (TextMessage.class.isInstance(value)) {
-                    answer = JmsMessageType.Text;
-                } else if (ObjectMessage.class.isInstance(value)) {
-                    answer = JmsMessageType.Object;
-                } else {
-                    answer = JmsMessageType.Message;
-                }
-            }
-        }
-        return answer;
-    }
-
-    public static JmsMessageType discoverType(final Exchange exchange) {
-        JmsMessageType answer = 
(JmsMessageType)exchange.getIn().getHeader(JMS_MESSAGE_TYPE);
-        if (answer == null) {
-            final Object value = exchange.getIn().getBody();
-            if (value != null) {
-                if (Byte[].class.isInstance(value)) {
-                    answer = JmsMessageType.Bytes;
-                } else if (Collection.class.isInstance(value)) {
-                    answer = JmsMessageType.Map;
-                } else if (String.class.isInstance(value)) {
-                    answer = JmsMessageType.Text;
-                } else if (Serializable.class.isInstance(value)) {
-                    answer = JmsMessageType.Object;
-                } else {
-                    answer = JmsMessageType.Message;
-                }
-            }
-        }
-        return answer;
-    }
-
     @SuppressWarnings("unchecked")
     public static Exchange setJmsMessageHeaders(final Message jmsMessage, 
final Exchange exchange, boolean out) throws JMSException {
         HashMap<String, Object> headers = new HashMap<String, Object>();
@@ -497,59 +470,23 @@ public final class SjmsExchangeMessageHe
         }
         return exchange;
     }
-
-    public static Message createMessage(Exchange exchange, Session session) 
throws Exception {
-        return createMessage(exchange, session, false);
-    }
-
-    @SuppressWarnings("unchecked")
-    public static Message createMessage(Exchange exchange, Session session, 
boolean out) throws Exception {
+    
+    public static Message createMessage(Exchange exchange, Session session, 
KeyFormatStrategy keyFormatStrategy) throws Exception {
         Message answer = null;
         Object body = null;
-        try {
-            if (out && exchange.getOut().getBody() != null) {
-                body = exchange.getOut().getBody();
-            } else {
-                body = exchange.getIn().getBody();
-            }
-            JmsMessageType messageType = 
SjmsExchangeMessageHelper.discoverType(exchange);
+        Map<String, Object> bodyHeaders = null;
 
-            switch (messageType) {
-            case Bytes:
-                BytesMessage bytesMessage = session.createBytesMessage();
-                bytesMessage.writeBytes((byte[])body);
-                answer = bytesMessage;
-                break;
-            case Map:
-                MapMessage mapMessage = session.createMapMessage();
-                Map<String, Object> objMap = (Map<String, Object>)body;
-                Set<String> keys = objMap.keySet();
-                for (String key : keys) {
-                    Object value = objMap.get(key);
-                    mapMessage.setObject(key, value);
-                }
-                answer = mapMessage;
-                break;
-            case Object:
-                ObjectMessage objectMessage = session.createObjectMessage();
-                objectMessage.setObject((Serializable)body);
-                answer = objectMessage;
-                break;
-            case Text:
-                TextMessage textMessage = session.createTextMessage();
-                textMessage.setText((String)body);
-                answer = textMessage;
-                break;
-            default:
-                answer = session.createMessage();
-                break;
-            }
-        } catch (Exception e) {
-            LOGGER.error("TODO Auto-generated catch block", e);
-            throw e;
+        
+        if (exchange.getOut().getBody() != null) {
+            body = exchange.getOut().getBody();
+            bodyHeaders = new HashMap<String, 
Object>(exchange.getOut().getHeaders());
+        } else {
+            body = exchange.getIn().getBody();
+            bodyHeaders = new HashMap<String, 
Object>(exchange.getIn().getHeaders());
         }
-
-        answer = SjmsExchangeMessageHelper.setJmsMessageHeaders(exchange, 
answer);
+        
+        answer = JmsMessageHelper.createMessage(session, body, bodyHeaders, 
keyFormatStrategy);
+        
         return answer;
     }
 

Modified: 
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java?rev=1403841&r1=1403840&r2=1403841&view=diff
==============================================================================
--- 
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
 (original)
+++ 
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
 Tue Oct 30 19:46:53 2012
@@ -16,10 +16,6 @@
  */
 package org.apache.camel.component.sjms;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
 import java.util.concurrent.ExecutorService;
 
 import javax.jms.MessageProducer;
@@ -159,19 +155,15 @@ public abstract class SjmsProducer exten
 
     @Override
     public boolean process(final Exchange exchange, final AsyncCallback 
callback) {
-        log.debug("Processing Exchange.id:{}", exchange.getExchangeId());
-        
-        Object body = exchange.getIn().getBody();
-        if (body != null) {
-            if (body instanceof InputStream) {
-                byte[] bytes = 
exchange.getContext().getTypeConverter().convertTo(byte[].class, body);
-                exchange.getIn().setBody(bytes);
-            }
+        if (log.isDebugEnabled()) {
+            log.debug("Processing Exchange.id:{}", exchange.getExchangeId());
         }
         
         try {
             if (!isSynchronous()) {
-                log.debug("  Sending message asynchronously: {}", 
exchange.getIn().getBody());
+                if (log.isDebugEnabled()) {
+                    log.debug("  Sending message asynchronously: {}", 
exchange.getIn().getBody());
+                }
                 getExecutor().execute(new Runnable() {
                     @Override
                     public void run() {
@@ -183,12 +175,18 @@ public abstract class SjmsProducer exten
                     }
                 });
             } else {
-                log.debug("  Sending message synchronously: {}", 
exchange.getIn().getBody());
+                if (log.isDebugEnabled()) {
+                    log.debug("  Sending message synchronously: {}", 
exchange.getIn().getBody());
+                }
                 sendMessage(exchange, callback);
             }
         } catch (Exception e) {
-            log.debug("Processing Exchange.id:{}", exchange.getExchangeId() + 
" - FAILED");
-            log.trace("Exception: " + e.getLocalizedMessage(), e);
+            if (log.isDebugEnabled()) {
+                log.debug("Processing Exchange.id:{}", 
exchange.getExchangeId() + " - FAILED");
+            }
+            if (log.isDebugEnabled()) {
+                log.trace("Exception: " + e.getLocalizedMessage(), e);
+            }
             exchange.setException(e);
         }
         log.debug("Processing Exchange.id:{}", exchange.getExchangeId() + " - 
SUCCESS");
@@ -196,45 +194,6 @@ public abstract class SjmsProducer exten
         return isSynchronous();
     }
     
-    public static byte[] getBytes(InputStream is) throws IOException {
-        int len;
-        int size = 1024;
-        byte[] buf;
-
-        if (is instanceof ByteArrayInputStream) {
-            size = is.available();
-            buf = new byte[size];
-            len = is.read(buf, 0, size);
-        } else {
-            ByteArrayOutputStream bos = new ByteArrayOutputStream();
-            buf = new byte[size];
-            while ((len = is.read(buf, 0, size)) != -1) {
-                bos.write(buf, 0, len);
-            }
-            buf = bos.toByteArray();
-        }
-        return buf;
-    }
-    
-//    public static byte[] getBytesFromStream(InputStream is) throws 
IOException {
-//        BufferedInputStream bis = new BufferedInputStream(is);
-//        bis.available();
-//
-//        // Create the byte array to hold the data
-//        byte[] bytes = new byte[(int)bis.available()];
-//
-//        // Read in the bytes
-//        int offset = 0;
-//        int numRead = 0;
-//        while (offset < bytes.length && (numRead = is.read(bytes, offset, 
bytes.length - offset)) >= 0) {
-//            offset += numRead;
-//        }
-//
-//        // Close the input stream and return bytes
-//        is.close();
-//        return bytes;
-//    }
-    
 
     protected SjmsEndpoint getSjmsEndpoint() {
         return (SjmsEndpoint)this.getEndpoint();

Modified: 
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOutMessageHandler.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOutMessageHandler.java?rev=1403841&r1=1403840&r2=1403841&view=diff
==============================================================================
--- 
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOutMessageHandler.java
 (original)
+++ 
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOutMessageHandler.java
 Tue Oct 30 19:46:53 2012
@@ -16,7 +16,6 @@
  */
 package org.apache.camel.component.sjms.consumer;
 
-import java.io.InputStream;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.concurrent.ExecutorService;
@@ -33,7 +32,9 @@ import javax.jms.Topic;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
+import org.apache.camel.component.sjms.SjmsEndpoint;
 import org.apache.camel.component.sjms.SjmsExchangeMessageHelper;
+import org.apache.camel.component.sjms.jms.JmsMessageHelper;
 import org.apache.camel.component.sjms.jms.JmsObjectFactory;
 import org.apache.camel.spi.Synchronization;
 import org.apache.camel.util.AsyncProcessorHelper;
@@ -77,7 +78,7 @@ public class InOutMessageHandler extends
     public void handleMessage(final Exchange exchange) {
         try {
             MessageProducer messageProducer = null;
-            Object obj = exchange.getIn().getHeader("JMSReplyTo");
+            Object obj = 
exchange.getIn().getHeader(JmsMessageHelper.JMS_REPLY_TO);
             if (obj != null) {
                 Destination replyTo = null;
                 if (isDestination(obj)) {
@@ -185,14 +186,7 @@ public class InOutMessageHandler extends
         public void done(boolean sync) {
 
             try {
-                Object body = exchange.getOut().getBody();
-                if (body != null) {
-                    if (body instanceof InputStream) {
-                        byte[] bytes = 
exchange.getContext().getTypeConverter().convertTo(byte[].class, body);
-                        exchange.getOut().setBody(bytes);
-                    }
-                }
-                Message response = 
SjmsExchangeMessageHelper.createMessage(exchange, getSession(), true);
+                Message response = 
SjmsExchangeMessageHelper.createMessage(exchange, getSession(), 
((SjmsEndpoint)getEndpoint()).getJmsKeyFormatStrategy());
                 
response.setJMSCorrelationID(exchange.getIn().getHeader("JMSCorrelationID", 
String.class));
                 localProducer.send(response);
             } catch (Exception e) {

Modified: 
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java?rev=1403841&r1=1403840&r2=1403841&view=diff
==============================================================================
--- 
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java
 (original)
+++ 
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java
 Tue Oct 30 19:46:53 2012
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.sjms.jms;
 
+import java.io.InputStream;
 import java.io.Serializable;
 import java.util.Collection;
 import java.util.HashMap;
@@ -30,6 +31,7 @@ import javax.jms.MapMessage;
 import javax.jms.Message;
 import javax.jms.ObjectMessage;
 import javax.jms.Session;
+import javax.jms.StreamMessage;
 import javax.jms.TextMessage;
 
 import org.slf4j.Logger;
@@ -91,7 +93,7 @@ public final class JmsMessageHelper {
     @SuppressWarnings("unchecked")
     public static Message createMessage(Session session, Object payload, 
Map<String, Object> messageHeaders, KeyFormatStrategy keyFormatStrategy) throws 
Exception {
         Message answer = null;
-        JmsMessageType messageType = 
JmsMessageHelper.discoverPayloadType(payload);
+        JmsMessageType messageType = 
JmsMessageHelper.discoverMessgeTypeFromPayload(payload);
         try {
 
             switch (messageType) {
@@ -360,13 +362,20 @@ public final class JmsMessageHelper {
         }
     }
 
-    public static JmsMessageType discoverPayloadType(Object payload) {
+    public static JmsMessageType discoverMessgeTypeFromPayload(final Object 
payload) {
         JmsMessageType answer = null;
-        if (payload != null) {
+        // Default is a JMS Message since a body is not required
+        if (payload == null) {
+            answer = JmsMessageType.Message;
+        } else {
+            // Something was found in the body so determine
+            // what type of message we need to create
             if (Byte[].class.isInstance(payload)) {
                 answer = JmsMessageType.Bytes;
             } else if (Collection.class.isInstance(payload)) {
                 answer = JmsMessageType.Map;
+            } else if (InputStream.class.isInstance(payload)) {
+                answer = JmsMessageType.Stream;
             } else if (String.class.isInstance(payload)) {
                 answer = JmsMessageType.Text;
             } else if (Serializable.class.isInstance(payload)) {
@@ -374,6 +383,27 @@ public final class JmsMessageHelper {
             } else {
                 answer = JmsMessageType.Message;
             }
+        }
+
+        return answer;
+    }
+
+    public static JmsMessageType discoverJmsMessageType(Message message) {
+        JmsMessageType answer = null;
+        if (message != null) {
+            if (BytesMessage.class.isInstance(message)) {
+                answer = JmsMessageType.Bytes;
+            } else if (MapMessage.class.isInstance(message)) {
+                answer = JmsMessageType.Map;
+            } else if (TextMessage.class.isInstance(message)) {
+                answer = JmsMessageType.Text;
+            } else if (StreamMessage.class.isInstance(message)) {
+                answer = JmsMessageType.Stream;
+            } else if (ObjectMessage.class.isInstance(message)) {
+                answer = JmsMessageType.Object;
+            } else {
+                answer = JmsMessageType.Message;
+            }
         } else {
             answer = JmsMessageType.Message;
         }

Modified: 
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageType.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageType.java?rev=1403841&r1=1403840&r2=1403841&view=diff
==============================================================================
--- 
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageType.java
 (original)
+++ 
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageType.java
 Tue Oct 30 19:46:53 2012
@@ -29,10 +29,7 @@ public enum JmsMessageType {
     Bytes,
     Map,
     Object,
-    /**
-     * TODO Write support for Stream Messages
-     */
-    //Stream,
+    Stream,
     Text,
     
     /**

Modified: 
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java?rev=1403841&r1=1403840&r2=1403841&view=diff
==============================================================================
--- 
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
 (original)
+++ 
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
 Tue Oct 30 19:46:53 2012
@@ -271,8 +271,8 @@ public class InOutProducer extends SjmsP
                 if (isEndpointTransacted()) {
                     exchange.getUnitOfWork().addSynchronization(new 
SessionTransactionSynchronization(producer.getSession(), getCommitStrategy()));
                 }
-
-                Message request = 
SjmsExchangeMessageHelper.createMessage(exchange, producer.getSession());
+                
+                Message request = 
SjmsExchangeMessageHelper.createMessage(exchange, producer.getSession(), 
getSjmsEndpoint().getJmsKeyFormatStrategy());
                 
                 // TODO just set the correlation id don't get it from the
                 // message


Reply via email to