Repository: camel Updated Branches: refs/heads/master c98f35bc5 -> ba8eb427c
CAMEL-8070: Supporting byte[] messages in camel-websocket Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ba8eb427 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ba8eb427 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ba8eb427 Branch: refs/heads/master Commit: ba8eb427c45f9c9e4c5a8a2eaee52c5ad49da930 Parents: c98f35b Author: Akitoshi Yoshida <a...@apache.org> Authored: Fri Nov 21 20:20:45 2014 +0100 Committer: Akitoshi Yoshida <a...@apache.org> Committed: Mon Nov 24 12:19:16 2014 +0100 ---------------------------------------------------------------------- .../component/websocket/DefaultWebsocket.java | 17 +++++- .../component/websocket/WebsocketConsumer.java | 4 ++ .../component/websocket/WebsocketProducer.java | 16 ++++-- .../websocket/WebsocketConsumerRouteTest.java | 43 ++++++++++---- .../WebsocketProducerRouteExampleTest.java | 59 +++++++++++++++++++- .../websocket/WebsocketProducerTest.java | 18 +++--- 6 files changed, 128 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/ba8eb427/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/DefaultWebsocket.java ---------------------------------------------------------------------- diff --git a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/DefaultWebsocket.java b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/DefaultWebsocket.java index 18c2536..60dbec3 100644 --- a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/DefaultWebsocket.java +++ b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/DefaultWebsocket.java @@ -20,11 +20,12 @@ import java.io.Serializable; import java.util.UUID; import org.eclipse.jetty.websocket.WebSocket; +import org.eclipse.jetty.websocket.WebSocket.OnBinaryMessage; import org.eclipse.jetty.websocket.WebSocket.OnTextMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class DefaultWebsocket implements WebSocket, OnTextMessage, Serializable { +public class DefaultWebsocket implements WebSocket, OnTextMessage, OnBinaryMessage, Serializable { private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(DefaultWebsocket.class); @@ -62,6 +63,19 @@ public class DefaultWebsocket implements WebSocket, OnTextMessage, Serializable } } + + @Override + public void onMessage(byte[] data, int offset, int length) { + LOG.debug("onMessage: byte[]"); + if (this.consumer != null) { + byte[] message = new byte[length]; + System.arraycopy(data, offset, message, 0, length); + this.consumer.sendMessage(this.connectionKey, message); + } else { + LOG.debug("No consumer to handle message received: byte[]"); + } + } + public Connection getConnection() { return connection; } @@ -77,5 +91,4 @@ public class DefaultWebsocket implements WebSocket, OnTextMessage, Serializable public void setConnectionKey(String connectionKey) { this.connectionKey = connectionKey; } - } http://git-wip-us.apache.org/repos/asf/camel/blob/ba8eb427/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketConsumer.java b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketConsumer.java index b879877..d197e0c 100644 --- a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketConsumer.java +++ b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketConsumer.java @@ -51,6 +51,10 @@ public class WebsocketConsumer extends DefaultConsumer implements WebsocketProdu } public void sendMessage(final String connectionKey, final String message) { + sendMessage(connectionKey, (Object)message); + } + + public void sendMessage(final String connectionKey, final Object message) { final Exchange exchange = getEndpoint().createExchange(); http://git-wip-us.apache.org/repos/asf/camel/blob/ba8eb427/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java index 7dd7d08..b00fc79 100644 --- a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java +++ b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java @@ -40,8 +40,10 @@ public class WebsocketProducer extends DefaultProducer implements WebsocketProdu @Override public void process(Exchange exchange) throws Exception { Message in = exchange.getIn(); - String message = in.getMandatoryBody(String.class); - + Object message = in.getMandatoryBody(); + if (!(message == null || message instanceof String || message instanceof byte[])) { + message = in.getMandatoryBody(String.class); + } if (isSendToAllSet(in)) { sendToAll(store, message, exchange); } else { @@ -80,7 +82,7 @@ public class WebsocketProducer extends DefaultProducer implements WebsocketProdu return value == null ? false : value; } - void sendToAll(WebsocketStore store, String message, Exchange exchange) throws Exception { + void sendToAll(WebsocketStore store, Object message, Exchange exchange) throws Exception { log.debug("Sending to all {}", message); Collection<DefaultWebsocket> websockets = store.getAll(); Exception exception = null; @@ -98,11 +100,15 @@ public class WebsocketProducer extends DefaultProducer implements WebsocketProdu } } - void sendMessage(DefaultWebsocket websocket, String message) throws IOException { + void sendMessage(DefaultWebsocket websocket, Object message) throws IOException { // in case there is web socket and socket connection is open - send message if (websocket != null && websocket.getConnection().isOpen()) { log.trace("Sending to websocket {} -> {}", websocket.getConnectionKey(), message); - websocket.getConnection().sendMessage(message); + if (message instanceof String) { + websocket.getConnection().sendMessage((String)message); + } else if (message instanceof byte[]) { + websocket.getConnection().sendMessage((byte[])message, 0, ((byte[])message).length); + } } } } http://git-wip-us.apache.org/repos/asf/camel/blob/ba8eb427/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketConsumerRouteTest.java ---------------------------------------------------------------------- diff --git a/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketConsumerRouteTest.java b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketConsumerRouteTest.java index 64693dd..6422964 100644 --- a/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketConsumerRouteTest.java +++ b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketConsumerRouteTest.java @@ -18,9 +18,8 @@ package org.apache.camel.component.websocket; import com.ning.http.client.AsyncHttpClient; import com.ning.http.client.websocket.WebSocket; -import com.ning.http.client.websocket.WebSocketTextListener; +import com.ning.http.client.websocket.WebSocketListener; import com.ning.http.client.websocket.WebSocketUpgradeHandler; - import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.test.AvailablePortFinder; @@ -45,17 +44,40 @@ public class WebsocketConsumerRouteTest extends CamelTestSupport { WebSocket websocket = c.prepareGet("ws://127.0.0.1:" + port + "/echo").execute( new WebSocketUpgradeHandler.Builder() - .addWebSocketListener(new WebSocketTextListener() { + .addWebSocketListener(new WebSocketListener() { @Override - public void onMessage(String message) { - + public void onOpen(WebSocket websocket) { } @Override - public void onFragment(String fragment, boolean last) { + public void onClose(WebSocket websocket) { } @Override + public void onError(Throwable t) { + t.printStackTrace(); + } + }).build()).get(); + + MockEndpoint result = getMockEndpoint("mock:result"); + result.expectedBodiesReceived("Test"); + + websocket.sendTextMessage("Test"); + + result.assertIsSatisfied(); + + websocket.close(); + c.close(); + } + + @Test + public void testWSBytesHttpCall() throws Exception { + AsyncHttpClient c = new AsyncHttpClient(); + + WebSocket websocket = c.prepareGet("ws://127.0.0.1:" + port + "/echo").execute( + new WebSocketUpgradeHandler.Builder() + .addWebSocketListener(new WebSocketListener() { + @Override public void onOpen(WebSocket websocket) { } @@ -68,14 +90,15 @@ public class WebsocketConsumerRouteTest extends CamelTestSupport { t.printStackTrace(); } }).build()).get(); - + MockEndpoint result = getMockEndpoint("mock:result"); - result.expectedBodiesReceived("Test"); + final byte[] testmessage = "Test".getBytes("utf-8"); + result.expectedBodiesReceived(testmessage); - websocket.sendTextMessage("Test"); + websocket.sendMessage(testmessage); result.assertIsSatisfied(); - + websocket.close(); c.close(); } http://git-wip-us.apache.org/repos/asf/camel/blob/ba8eb427/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerRouteExampleTest.java ---------------------------------------------------------------------- diff --git a/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerRouteExampleTest.java b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerRouteExampleTest.java index ae2f2d7..bd5e9ba 100644 --- a/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerRouteExampleTest.java +++ b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerRouteExampleTest.java @@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit; import com.ning.http.client.AsyncHttpClient; import com.ning.http.client.websocket.WebSocket; +import com.ning.http.client.websocket.WebSocketByteListener; import com.ning.http.client.websocket.WebSocketTextListener; import com.ning.http.client.websocket.WebSocketUpgradeHandler; import org.apache.camel.Produce; @@ -35,8 +36,8 @@ import org.junit.Test; public class WebsocketProducerRouteExampleTest extends CamelTestSupport { - private static List<String> received = new ArrayList<String>(); - private static CountDownLatch latch = new CountDownLatch(1); + private static List<Object> received = new ArrayList<Object>(); + private static CountDownLatch latch; protected int port; @Produce(uri = "direct:shop") @@ -47,6 +48,8 @@ public class WebsocketProducerRouteExampleTest extends CamelTestSupport { public void setUp() throws Exception { port = AvailablePortFinder.getNextAvailable(16200); super.setUp(); + received.clear(); + latch = new CountDownLatch(1); } @Test @@ -87,7 +90,57 @@ public class WebsocketProducerRouteExampleTest extends CamelTestSupport { assertTrue(latch.await(10, TimeUnit.SECONDS)); assertEquals(1, received.size()); - assertEquals("Beer on stock at Apache Mall", received.get(0)); + Object r = received.get(0); + assertTrue(r instanceof String); + assertEquals("Beer on stock at Apache Mall", r); + + websocket.close(); + c.close(); + } + + @Test + public void testWSBytesHttpCall() throws Exception { + AsyncHttpClient c = new AsyncHttpClient(); + + WebSocket websocket = c.prepareGet("ws://localhost:" + port + "/shop").execute( + new WebSocketUpgradeHandler.Builder() + .addWebSocketListener(new WebSocketByteListener() { + + @Override + public void onMessage(byte[] message) { + received.add(message); + log.info("received --> " + message); + latch.countDown(); + } + + @Override + public void onFragment(byte[] fragment, boolean last) { + } + + @Override + public void onOpen(WebSocket websocket) { + } + + @Override + public void onClose(WebSocket websocket) { + } + + @Override + public void onError(Throwable t) { + t.printStackTrace(); + } + }).build()).get(); + + // Send message to the direct endpoint + byte[] testmessage = "Beer on stock at Apache Mall".getBytes("utf-8"); + producer.sendBodyAndHeader(testmessage, WebsocketConstants.SEND_TO_ALL, "true"); + + assertTrue(latch.await(10, TimeUnit.SECONDS)); + + assertEquals(1, received.size()); + Object r = received.get(0); + assertTrue(r instanceof byte[]); + assertArrayEquals(testmessage, (byte[])r); websocket.close(); c.close(); http://git-wip-us.apache.org/repos/asf/camel/blob/ba8eb427/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerTest.java b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerTest.java index a866df0..7a1bcdf 100644 --- a/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerTest.java +++ b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerTest.java @@ -78,7 +78,7 @@ public class WebsocketProducerTest { @Test public void testProcessSingleMessage() throws Exception { when(exchange.getIn()).thenReturn(inMessage); - when(inMessage.getMandatoryBody(String.class)).thenReturn(MESSAGE); + when(inMessage.getMandatoryBody()).thenReturn(MESSAGE); when(inMessage.getHeader(WebsocketConstants.SEND_TO_ALL, false, Boolean.class)).thenReturn(false); when(inMessage.getHeader(WebsocketConstants.CONNECTION_KEY, String.class)).thenReturn(SESSION_KEY); when(store.get(SESSION_KEY)).thenReturn(defaultWebsocket1); @@ -89,7 +89,7 @@ public class WebsocketProducerTest { InOrder inOrder = inOrder(endpoint, store, connection, defaultWebsocket1, defaultWebsocket2, exchange, inMessage); inOrder.verify(exchange, times(1)).getIn(); - inOrder.verify(inMessage, times(1)).getMandatoryBody(String.class); + inOrder.verify(inMessage, times(1)).getMandatoryBody(); inOrder.verify(inMessage, times(1)).getHeader(WebsocketConstants.SEND_TO_ALL, false, Boolean.class); inOrder.verify(inMessage, times(1)).getHeader(WebsocketConstants.CONNECTION_KEY, String.class); inOrder.verify(store, times(1)).get(SESSION_KEY); @@ -103,7 +103,7 @@ public class WebsocketProducerTest { @Test public void testProcessSingleMessageWithException() throws Exception { when(exchange.getIn()).thenReturn(inMessage); - when(inMessage.getMandatoryBody(String.class)).thenReturn(MESSAGE); + when(inMessage.getMandatoryBody()).thenReturn(MESSAGE); when(inMessage.getHeader(WebsocketConstants.SEND_TO_ALL, false, Boolean.class)).thenReturn(false); when(inMessage.getHeader(WebsocketConstants.CONNECTION_KEY, String.class)).thenReturn(SESSION_KEY); when(store.get(SESSION_KEY)).thenReturn(defaultWebsocket1); @@ -120,7 +120,7 @@ public class WebsocketProducerTest { InOrder inOrder = inOrder(endpoint, store, connection, defaultWebsocket1, defaultWebsocket2, exchange, inMessage); inOrder.verify(exchange, times(1)).getIn(); - inOrder.verify(inMessage, times(1)).getMandatoryBody(String.class); + inOrder.verify(inMessage, times(1)).getMandatoryBody(); inOrder.verify(inMessage, times(1)).getHeader(WebsocketConstants.SEND_TO_ALL, false, Boolean.class); inOrder.verify(inMessage, times(1)).getHeader(WebsocketConstants.CONNECTION_KEY, String.class); inOrder.verify(store, times(1)).get(SESSION_KEY); @@ -134,7 +134,7 @@ public class WebsocketProducerTest { @Test public void testProcessMultipleMessages() throws Exception { when(exchange.getIn()).thenReturn(inMessage); - when(inMessage.getMandatoryBody(String.class)).thenReturn(MESSAGE); + when(inMessage.getMandatoryBody()).thenReturn(MESSAGE); when(inMessage.getHeader(WebsocketConstants.SEND_TO_ALL, false, Boolean.class)).thenReturn(true); when(store.getAll()).thenReturn(sockets); when(defaultWebsocket1.getConnection()).thenReturn(connection); @@ -145,7 +145,7 @@ public class WebsocketProducerTest { InOrder inOrder = inOrder(endpoint, store, connection, defaultWebsocket1, defaultWebsocket2, exchange, inMessage); inOrder.verify(exchange, times(1)).getIn(); - inOrder.verify(inMessage, times(1)).getMandatoryBody(String.class); + inOrder.verify(inMessage, times(1)).getMandatoryBody(); inOrder.verify(inMessage, times(1)).getHeader(WebsocketConstants.SEND_TO_ALL, false, Boolean.class); inOrder.verify(store, times(1)).getAll(); inOrder.verify(defaultWebsocket1, times(1)).getConnection(); @@ -162,7 +162,7 @@ public class WebsocketProducerTest { @Test public void testProcessMultipleMessagesWithException() throws Exception { when(exchange.getIn()).thenReturn(inMessage); - when(inMessage.getMandatoryBody(String.class)).thenReturn(MESSAGE); + when(inMessage.getMandatoryBody()).thenReturn(MESSAGE); when(inMessage.getHeader(WebsocketConstants.SEND_TO_ALL, false, Boolean.class)).thenReturn(true); when(store.getAll()).thenReturn(sockets); when(defaultWebsocket1.getConnection()).thenReturn(connection); @@ -179,7 +179,7 @@ public class WebsocketProducerTest { InOrder inOrder = inOrder(endpoint, store, connection, defaultWebsocket1, defaultWebsocket2, exchange, inMessage); inOrder.verify(exchange, times(1)).getIn(); - inOrder.verify(inMessage, times(1)).getMandatoryBody(String.class); + inOrder.verify(inMessage, times(1)).getMandatoryBody(); inOrder.verify(inMessage, times(1)).getHeader(WebsocketConstants.SEND_TO_ALL, false, Boolean.class); inOrder.verify(store, times(1)).getAll(); inOrder.verify(defaultWebsocket1, times(1)).getConnection(); @@ -211,7 +211,7 @@ public class WebsocketProducerTest { InOrder inOrder = inOrder(endpoint, store, connection, defaultWebsocket1, defaultWebsocket2, exchange, inMessage); inOrder.verify(exchange, times(1)).getIn(); - inOrder.verify(inMessage, times(1)).getMandatoryBody(String.class); + inOrder.verify(inMessage, times(1)).getMandatoryBody(); inOrder.verify(inMessage, times(1)).getHeader(WebsocketConstants.SEND_TO_ALL, false, Boolean.class); inOrder.verify(inMessage, times(1)).getHeader(WebsocketConstants.CONNECTION_KEY, String.class); inOrder.verifyNoMoreInteractions();