Repository: camel
Updated Branches:
  refs/heads/master c98f35bc5 -> ba8eb427c


CAMEL-8070: Supporting byte[] messages in camel-websocket


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ba8eb427
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ba8eb427
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ba8eb427

Branch: refs/heads/master
Commit: ba8eb427c45f9c9e4c5a8a2eaee52c5ad49da930
Parents: c98f35b
Author: Akitoshi Yoshida <a...@apache.org>
Authored: Fri Nov 21 20:20:45 2014 +0100
Committer: Akitoshi Yoshida <a...@apache.org>
Committed: Mon Nov 24 12:19:16 2014 +0100

----------------------------------------------------------------------
 .../component/websocket/DefaultWebsocket.java   | 17 +++++-
 .../component/websocket/WebsocketConsumer.java  |  4 ++
 .../component/websocket/WebsocketProducer.java  | 16 ++++--
 .../websocket/WebsocketConsumerRouteTest.java   | 43 ++++++++++----
 .../WebsocketProducerRouteExampleTest.java      | 59 +++++++++++++++++++-
 .../websocket/WebsocketProducerTest.java        | 18 +++---
 6 files changed, 128 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/ba8eb427/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/DefaultWebsocket.java
----------------------------------------------------------------------
diff --git 
a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/DefaultWebsocket.java
 
b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/DefaultWebsocket.java
index 18c2536..60dbec3 100644
--- 
a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/DefaultWebsocket.java
+++ 
b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/DefaultWebsocket.java
@@ -20,11 +20,12 @@ import java.io.Serializable;
 import java.util.UUID;
 
 import org.eclipse.jetty.websocket.WebSocket;
+import org.eclipse.jetty.websocket.WebSocket.OnBinaryMessage;
 import org.eclipse.jetty.websocket.WebSocket.OnTextMessage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class DefaultWebsocket implements WebSocket, OnTextMessage, 
Serializable {
+public class DefaultWebsocket implements WebSocket, OnTextMessage, 
OnBinaryMessage, Serializable {
     private static final long serialVersionUID = 1L;
     private static final Logger LOG = 
LoggerFactory.getLogger(DefaultWebsocket.class);
 
@@ -62,6 +63,19 @@ public class DefaultWebsocket implements WebSocket, 
OnTextMessage, Serializable
         }
     }
 
+
+    @Override
+    public void onMessage(byte[] data, int offset, int length) {
+        LOG.debug("onMessage: byte[]");
+        if (this.consumer != null) {
+            byte[] message = new byte[length];
+            System.arraycopy(data, offset, message, 0, length);
+            this.consumer.sendMessage(this.connectionKey, message);
+        } else {
+            LOG.debug("No consumer to handle message received: byte[]");
+        }
+    }
+
     public Connection getConnection() {
         return connection;
     }
@@ -77,5 +91,4 @@ public class DefaultWebsocket implements WebSocket, 
OnTextMessage, Serializable
     public void setConnectionKey(String connectionKey) {
         this.connectionKey = connectionKey;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/ba8eb427/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketConsumer.java
 
b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketConsumer.java
index b879877..d197e0c 100644
--- 
a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketConsumer.java
+++ 
b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketConsumer.java
@@ -51,6 +51,10 @@ public class WebsocketConsumer extends DefaultConsumer 
implements WebsocketProdu
     }
 
     public void sendMessage(final String connectionKey, final String message) {
+        sendMessage(connectionKey, (Object)message);
+    }
+
+    public void sendMessage(final String connectionKey, final Object message) {
 
         final Exchange exchange = getEndpoint().createExchange();
 

http://git-wip-us.apache.org/repos/asf/camel/blob/ba8eb427/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java
 
b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java
index 7dd7d08..b00fc79 100644
--- 
a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java
+++ 
b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java
@@ -40,8 +40,10 @@ public class WebsocketProducer extends DefaultProducer 
implements WebsocketProdu
     @Override
     public void process(Exchange exchange) throws Exception {
         Message in = exchange.getIn();
-        String message = in.getMandatoryBody(String.class);
-
+        Object message = in.getMandatoryBody();
+        if (!(message == null || message instanceof String || message 
instanceof byte[])) {
+            message = in.getMandatoryBody(String.class);
+        }
         if (isSendToAllSet(in)) {
             sendToAll(store, message, exchange);
         } else {
@@ -80,7 +82,7 @@ public class WebsocketProducer extends DefaultProducer 
implements WebsocketProdu
         return value == null ? false : value;
     }
 
-    void sendToAll(WebsocketStore store, String message, Exchange exchange) 
throws Exception {
+    void sendToAll(WebsocketStore store, Object message, Exchange exchange) 
throws Exception {
         log.debug("Sending to all {}", message);
         Collection<DefaultWebsocket> websockets = store.getAll();
         Exception exception = null;
@@ -98,11 +100,15 @@ public class WebsocketProducer extends DefaultProducer 
implements WebsocketProdu
         }
     }
 
-    void sendMessage(DefaultWebsocket websocket, String message) throws 
IOException {
+    void sendMessage(DefaultWebsocket websocket, Object message) throws 
IOException {
         // in case there is web socket and socket connection is open - send 
message
         if (websocket != null && websocket.getConnection().isOpen()) {
             log.trace("Sending to websocket {} -> {}", 
websocket.getConnectionKey(), message);
-            websocket.getConnection().sendMessage(message);
+            if (message instanceof String) {
+                websocket.getConnection().sendMessage((String)message);
+            } else if (message instanceof byte[]) {
+                websocket.getConnection().sendMessage((byte[])message, 0, 
((byte[])message).length);
+            }
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/ba8eb427/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketConsumerRouteTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketConsumerRouteTest.java
 
b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketConsumerRouteTest.java
index 64693dd..6422964 100644
--- 
a/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketConsumerRouteTest.java
+++ 
b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketConsumerRouteTest.java
@@ -18,9 +18,8 @@ package org.apache.camel.component.websocket;
 
 import com.ning.http.client.AsyncHttpClient;
 import com.ning.http.client.websocket.WebSocket;
-import com.ning.http.client.websocket.WebSocketTextListener;
+import com.ning.http.client.websocket.WebSocketListener;
 import com.ning.http.client.websocket.WebSocketUpgradeHandler;
-
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.test.AvailablePortFinder;
@@ -45,17 +44,40 @@ public class WebsocketConsumerRouteTest extends 
CamelTestSupport {
 
         WebSocket websocket = c.prepareGet("ws://127.0.0.1:" + port + 
"/echo").execute(
             new WebSocketUpgradeHandler.Builder()
-                .addWebSocketListener(new WebSocketTextListener() {
+                .addWebSocketListener(new WebSocketListener() {
                     @Override
-                    public void onMessage(String message) {
-                        
+                    public void onOpen(WebSocket websocket) {
                     }
 
                     @Override
-                    public void onFragment(String fragment, boolean last) {
+                    public void onClose(WebSocket websocket) {
                     }
 
                     @Override
+                    public void onError(Throwable t) {
+                        t.printStackTrace();
+                    }
+                }).build()).get();
+
+        MockEndpoint result = getMockEndpoint("mock:result");
+        result.expectedBodiesReceived("Test");
+
+        websocket.sendTextMessage("Test");
+
+        result.assertIsSatisfied();
+        
+        websocket.close();
+        c.close();
+    }
+
+    @Test
+    public void testWSBytesHttpCall() throws Exception {
+        AsyncHttpClient c = new AsyncHttpClient();
+
+        WebSocket websocket = c.prepareGet("ws://127.0.0.1:" + port + 
"/echo").execute(
+            new WebSocketUpgradeHandler.Builder()
+                .addWebSocketListener(new WebSocketListener() {
+                    @Override
                     public void onOpen(WebSocket websocket) {
                     }
 
@@ -68,14 +90,15 @@ public class WebsocketConsumerRouteTest extends 
CamelTestSupport {
                         t.printStackTrace();
                     }
                 }).build()).get();
-        
+
         MockEndpoint result = getMockEndpoint("mock:result");
-        result.expectedBodiesReceived("Test");
+        final byte[] testmessage = "Test".getBytes("utf-8");
+        result.expectedBodiesReceived(testmessage);
 
-        websocket.sendTextMessage("Test");
+        websocket.sendMessage(testmessage);
 
         result.assertIsSatisfied();
-        
+
         websocket.close();
         c.close();
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/ba8eb427/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerRouteExampleTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerRouteExampleTest.java
 
b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerRouteExampleTest.java
index ae2f2d7..bd5e9ba 100644
--- 
a/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerRouteExampleTest.java
+++ 
b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerRouteExampleTest.java
@@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit;
 
 import com.ning.http.client.AsyncHttpClient;
 import com.ning.http.client.websocket.WebSocket;
+import com.ning.http.client.websocket.WebSocketByteListener;
 import com.ning.http.client.websocket.WebSocketTextListener;
 import com.ning.http.client.websocket.WebSocketUpgradeHandler;
 import org.apache.camel.Produce;
@@ -35,8 +36,8 @@ import org.junit.Test;
 
 public class WebsocketProducerRouteExampleTest extends CamelTestSupport {
 
-    private static List<String> received = new ArrayList<String>();
-    private static CountDownLatch latch = new CountDownLatch(1);
+    private static List<Object> received = new ArrayList<Object>();
+    private static CountDownLatch latch;
     protected int port;
 
     @Produce(uri = "direct:shop")
@@ -47,6 +48,8 @@ public class WebsocketProducerRouteExampleTest extends 
CamelTestSupport {
     public void setUp() throws Exception {
         port = AvailablePortFinder.getNextAvailable(16200);
         super.setUp();
+        received.clear();
+        latch =  new CountDownLatch(1);
     }
 
     @Test
@@ -87,7 +90,57 @@ public class WebsocketProducerRouteExampleTest extends 
CamelTestSupport {
         assertTrue(latch.await(10, TimeUnit.SECONDS));
 
         assertEquals(1, received.size());
-        assertEquals("Beer on stock at Apache Mall", received.get(0));
+        Object r = received.get(0);
+        assertTrue(r instanceof String);
+        assertEquals("Beer on stock at Apache Mall", r);
+
+        websocket.close();
+        c.close();
+    }
+
+    @Test
+    public void testWSBytesHttpCall() throws Exception {
+        AsyncHttpClient c = new AsyncHttpClient();
+
+        WebSocket websocket = c.prepareGet("ws://localhost:" + port + 
"/shop").execute(
+            new WebSocketUpgradeHandler.Builder()
+                .addWebSocketListener(new WebSocketByteListener() {
+
+                    @Override
+                    public void onMessage(byte[] message) {
+                        received.add(message);
+                        log.info("received --> " + message);
+                        latch.countDown();
+                    }
+
+                    @Override
+                    public void onFragment(byte[] fragment, boolean last) {
+                    }
+
+                    @Override
+                    public void onOpen(WebSocket websocket) {
+                    }
+
+                    @Override
+                    public void onClose(WebSocket websocket) {
+                    }
+
+                    @Override
+                    public void onError(Throwable t) {
+                        t.printStackTrace();
+                    }
+                }).build()).get();
+
+        // Send message to the direct endpoint
+        byte[] testmessage = "Beer on stock at Apache Mall".getBytes("utf-8");
+        producer.sendBodyAndHeader(testmessage, 
WebsocketConstants.SEND_TO_ALL, "true");
+
+        assertTrue(latch.await(10, TimeUnit.SECONDS));
+
+        assertEquals(1, received.size());
+        Object r = received.get(0);
+        assertTrue(r instanceof byte[]);
+        assertArrayEquals(testmessage, (byte[])r);
 
         websocket.close();
         c.close();

http://git-wip-us.apache.org/repos/asf/camel/blob/ba8eb427/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerTest.java
 
b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerTest.java
index a866df0..7a1bcdf 100644
--- 
a/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerTest.java
+++ 
b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerTest.java
@@ -78,7 +78,7 @@ public class WebsocketProducerTest {
     @Test
     public void testProcessSingleMessage() throws Exception {
         when(exchange.getIn()).thenReturn(inMessage);
-        when(inMessage.getMandatoryBody(String.class)).thenReturn(MESSAGE);
+        when(inMessage.getMandatoryBody()).thenReturn(MESSAGE);
         when(inMessage.getHeader(WebsocketConstants.SEND_TO_ALL, false, 
Boolean.class)).thenReturn(false);
         when(inMessage.getHeader(WebsocketConstants.CONNECTION_KEY, 
String.class)).thenReturn(SESSION_KEY);
         when(store.get(SESSION_KEY)).thenReturn(defaultWebsocket1);
@@ -89,7 +89,7 @@ public class WebsocketProducerTest {
 
         InOrder inOrder = inOrder(endpoint, store, connection, 
defaultWebsocket1, defaultWebsocket2, exchange, inMessage);
         inOrder.verify(exchange, times(1)).getIn();
-        inOrder.verify(inMessage, times(1)).getMandatoryBody(String.class);
+        inOrder.verify(inMessage, times(1)).getMandatoryBody();
         inOrder.verify(inMessage, 
times(1)).getHeader(WebsocketConstants.SEND_TO_ALL, false, Boolean.class);
         inOrder.verify(inMessage, 
times(1)).getHeader(WebsocketConstants.CONNECTION_KEY, String.class);
         inOrder.verify(store, times(1)).get(SESSION_KEY);
@@ -103,7 +103,7 @@ public class WebsocketProducerTest {
     @Test
     public void testProcessSingleMessageWithException() throws Exception {
         when(exchange.getIn()).thenReturn(inMessage);
-        when(inMessage.getMandatoryBody(String.class)).thenReturn(MESSAGE);
+        when(inMessage.getMandatoryBody()).thenReturn(MESSAGE);
         when(inMessage.getHeader(WebsocketConstants.SEND_TO_ALL, false, 
Boolean.class)).thenReturn(false);
         when(inMessage.getHeader(WebsocketConstants.CONNECTION_KEY, 
String.class)).thenReturn(SESSION_KEY);
         when(store.get(SESSION_KEY)).thenReturn(defaultWebsocket1);
@@ -120,7 +120,7 @@ public class WebsocketProducerTest {
 
         InOrder inOrder = inOrder(endpoint, store, connection, 
defaultWebsocket1, defaultWebsocket2, exchange, inMessage);
         inOrder.verify(exchange, times(1)).getIn();
-        inOrder.verify(inMessage, times(1)).getMandatoryBody(String.class);
+        inOrder.verify(inMessage, times(1)).getMandatoryBody();
         inOrder.verify(inMessage, 
times(1)).getHeader(WebsocketConstants.SEND_TO_ALL, false, Boolean.class);
         inOrder.verify(inMessage, 
times(1)).getHeader(WebsocketConstants.CONNECTION_KEY, String.class);
         inOrder.verify(store, times(1)).get(SESSION_KEY);
@@ -134,7 +134,7 @@ public class WebsocketProducerTest {
     @Test
     public void testProcessMultipleMessages() throws Exception {
         when(exchange.getIn()).thenReturn(inMessage);
-        when(inMessage.getMandatoryBody(String.class)).thenReturn(MESSAGE);
+        when(inMessage.getMandatoryBody()).thenReturn(MESSAGE);
         when(inMessage.getHeader(WebsocketConstants.SEND_TO_ALL, false, 
Boolean.class)).thenReturn(true);
         when(store.getAll()).thenReturn(sockets);
         when(defaultWebsocket1.getConnection()).thenReturn(connection);
@@ -145,7 +145,7 @@ public class WebsocketProducerTest {
 
         InOrder inOrder = inOrder(endpoint, store, connection, 
defaultWebsocket1, defaultWebsocket2, exchange, inMessage);
         inOrder.verify(exchange, times(1)).getIn();
-        inOrder.verify(inMessage, times(1)).getMandatoryBody(String.class);
+        inOrder.verify(inMessage, times(1)).getMandatoryBody();
         inOrder.verify(inMessage, 
times(1)).getHeader(WebsocketConstants.SEND_TO_ALL, false, Boolean.class);
         inOrder.verify(store, times(1)).getAll();
         inOrder.verify(defaultWebsocket1, times(1)).getConnection();
@@ -162,7 +162,7 @@ public class WebsocketProducerTest {
     @Test
     public void testProcessMultipleMessagesWithException() throws Exception {
         when(exchange.getIn()).thenReturn(inMessage);
-        when(inMessage.getMandatoryBody(String.class)).thenReturn(MESSAGE);
+        when(inMessage.getMandatoryBody()).thenReturn(MESSAGE);
         when(inMessage.getHeader(WebsocketConstants.SEND_TO_ALL, false, 
Boolean.class)).thenReturn(true);
         when(store.getAll()).thenReturn(sockets);
         when(defaultWebsocket1.getConnection()).thenReturn(connection);
@@ -179,7 +179,7 @@ public class WebsocketProducerTest {
 
         InOrder inOrder = inOrder(endpoint, store, connection, 
defaultWebsocket1, defaultWebsocket2, exchange, inMessage);
         inOrder.verify(exchange, times(1)).getIn();
-        inOrder.verify(inMessage, times(1)).getMandatoryBody(String.class);
+        inOrder.verify(inMessage, times(1)).getMandatoryBody();
         inOrder.verify(inMessage, 
times(1)).getHeader(WebsocketConstants.SEND_TO_ALL, false, Boolean.class);
         inOrder.verify(store, times(1)).getAll();
         inOrder.verify(defaultWebsocket1, times(1)).getConnection();
@@ -211,7 +211,7 @@ public class WebsocketProducerTest {
 
         InOrder inOrder = inOrder(endpoint, store, connection, 
defaultWebsocket1, defaultWebsocket2, exchange, inMessage);
         inOrder.verify(exchange, times(1)).getIn();
-        inOrder.verify(inMessage, times(1)).getMandatoryBody(String.class);
+        inOrder.verify(inMessage, times(1)).getMandatoryBody();
         inOrder.verify(inMessage, 
times(1)).getHeader(WebsocketConstants.SEND_TO_ALL, false, Boolean.class);
         inOrder.verify(inMessage, 
times(1)).getHeader(WebsocketConstants.CONNECTION_KEY, String.class);
         inOrder.verifyNoMoreInteractions();

Reply via email to