Repository: camel Updated Branches: refs/heads/camel-2.14.x 191a84796 -> f7bbcea7f
fix the package name of atmosphere-websocket/src/test Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/c9eb078d Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/c9eb078d Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/c9eb078d Branch: refs/heads/camel-2.14.x Commit: c9eb078d57d693b1abe6e62edfbd802f1310abab Parents: 191a847 Author: Akitoshi Yoshida <a...@apache.org> Authored: Mon Sep 22 16:58:24 2014 +0200 Committer: Akitoshi Yoshida <a...@apache.org> Committed: Mon Sep 22 17:01:03 2014 +0200 ---------------------------------------------------------------------- .../websocket/MemoryWebSocketStoreTest.java | 70 ++++++ .../atmosphere/websocket/TestClient.java | 165 ++++++++++++++ .../WebsocketCamelRouterTestSupport.java | 72 +++++++ ...ponentConfigurationAndDocumentationTest.java | 56 +++++ .../websocket/WebsocketRouteTest.java | 216 +++++++++++++++++++ .../wsservlet/MemoryWebSocketStoreTest.java | 70 ------ .../camel/component/wsservlet/TestClient.java | 165 -------------- .../WebsocketCamelRouterTestSupport.java | 72 ------- ...ponentConfigurationAndDocumentationTest.java | 56 ----- .../component/wsservlet/WebsocketRouteTest.java | 216 ------------------- 10 files changed, 579 insertions(+), 579 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/c9eb078d/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/MemoryWebSocketStoreTest.java ---------------------------------------------------------------------- diff --git a/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/MemoryWebSocketStoreTest.java b/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/MemoryWebSocketStoreTest.java new file mode 100644 index 0000000..8a9b325 --- /dev/null +++ b/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/MemoryWebSocketStoreTest.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atmosphere.websocket; + +import java.util.UUID; + +import org.apache.camel.component.atmosphere.websocket.MemoryWebSocketStore; +import org.atmosphere.websocket.WebSocket; +import org.easymock.EasyMock; +import org.junit.Assert; +import org.junit.Test; + + +public class MemoryWebSocketStoreTest extends Assert { + + @Test + public void testAddAndRemove() throws Exception { + MemoryWebSocketStore store = new MemoryWebSocketStore(); + WebSocket webSocket1 = EasyMock.createMock(WebSocket.class); + WebSocket webSocket2 = EasyMock.createMock(WebSocket.class); + + String connectionKey1 = UUID.randomUUID().toString(); + String connectionKey2 = UUID.randomUUID().toString(); + + store.addWebSocket(connectionKey1, webSocket1); + verifyGet(store, connectionKey1, webSocket1, true); + assertEquals(1, store.getAllWebSockets().size()); + + store.addWebSocket(connectionKey2, webSocket2); + verifyGet(store, connectionKey2, webSocket2, true); + verifyGet(store, connectionKey1, webSocket1, true); + assertEquals(2, store.getAllWebSockets().size()); + + store.removeWebSocket(connectionKey1); + verifyGet(store, connectionKey1, webSocket1, false); + + store.removeWebSocket(webSocket2); + verifyGet(store, connectionKey2, webSocket2, false); + + assertEquals(0, store.getAllWebSockets().size()); + } + + private void verifyGet(MemoryWebSocketStore store, String ck, WebSocket ws, boolean exists) { + WebSocket aws = store.getWebSocket(ck); + String ack = store.getConnectionKey(ws); + if (exists) { + assertNotNull(aws); + assertEquals(ws, aws); + assertNotNull(ack); + assertEquals(ck, ack); + } else { + assertNull(aws); + assertNull(ack); + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/c9eb078d/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/TestClient.java ---------------------------------------------------------------------- diff --git a/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/TestClient.java b/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/TestClient.java new file mode 100644 index 0000000..32bccb9 --- /dev/null +++ b/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/TestClient.java @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atmosphere.websocket; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import com.ning.http.client.AsyncHttpClient; +import com.ning.http.client.AsyncHttpClientConfig; +import com.ning.http.client.websocket.WebSocket; +import com.ning.http.client.websocket.WebSocketByteListener; +import com.ning.http.client.websocket.WebSocketTextListener; +import com.ning.http.client.websocket.WebSocketUpgradeHandler; + + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestClient { + private static final Logger LOG = LoggerFactory.getLogger(TestClient.class); + + private List<Object> received; + private CountDownLatch latch; + private AsyncHttpClient client; + private WebSocket websocket; + private String url; + + public TestClient(String url, AsyncHttpClientConfig conf) { + this(url, conf, 1); + } + + public TestClient(String url, int count) { + this(url, null, count); + } + + public TestClient(String url) { + this(url, null, 1); + } + + public TestClient(String url, AsyncHttpClientConfig conf, int count) { + this.received = new ArrayList<Object>(); + this.latch = new CountDownLatch(count); + this.client = conf == null ? new AsyncHttpClient() : new AsyncHttpClient(conf); + this.url = url; + } + + public void connect() throws InterruptedException, ExecutionException, IOException { + websocket = client.prepareGet(url).execute( + new WebSocketUpgradeHandler.Builder() + .addWebSocketListener(new TestWebSocketListener()).build()).get(); + } + + public void sendTextMessage(String message) { + websocket.sendTextMessage(message); + } + + public void sendBytesMessage(byte[] message) { + websocket.sendMessage(message); + } + + public boolean await(int secs) throws InterruptedException { + return latch.await(secs, TimeUnit.SECONDS); + } + + public void reset(int count) { + latch = new CountDownLatch(count); + received.clear(); + } + + public List<Object> getReceived() { + return received; + } + + public <T> List<T> getReceived(Class<T> cls) { + List<T> list = new ArrayList<T>(); + for (Object o : received) { + list.add(getValue(o, cls)); + } + return list; + } + + @SuppressWarnings("unchecked") + private static <T> T getValue(Object o, Class<T> cls) { + if (cls.isInstance(o)) { + return (T)o; + } else if (cls == String.class) { + if (o instanceof byte[]) { + return (T)new String((byte[])o); + } else { + return (T)o.toString(); + } + } else if (cls == byte[].class) { + if (o instanceof String) { + return (T)((String)o).getBytes(); + } + } + return null; + } + + public void close() { + websocket.close(); + client.close(); + } + + private class TestWebSocketListener implements WebSocketTextListener, WebSocketByteListener { + + @Override + public void onOpen(WebSocket websocket) { + LOG.info("[ws] opened"); + } + + @Override + public void onClose(WebSocket websocket) { + LOG.info("[ws] closed"); + } + + @Override + public void onError(Throwable t) { + LOG.error("[ws] error", t); + } + + @Override + public void onMessage(byte[] message) { + received.add(message); + LOG.info("[ws] received bytes --> " + message); + latch.countDown(); + } + + @Override + public void onFragment(byte[] fragment, boolean last) { + // TODO Auto-generated method stub + } + + @Override + public void onMessage(String message) { + received.add(message); + LOG.info("[ws] received --> " + message); + latch.countDown(); + } + + @Override + public void onFragment(String fragment, boolean last) { + // TODO Auto-generated method stub + } + + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/c9eb078d/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketCamelRouterTestSupport.java ---------------------------------------------------------------------- diff --git a/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketCamelRouterTestSupport.java b/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketCamelRouterTestSupport.java new file mode 100644 index 0000000..d3f5099 --- /dev/null +++ b/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketCamelRouterTestSupport.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atmosphere.websocket; + +import org.apache.camel.component.atmosphere.websocket.CamelWebSocketServlet; +import org.apache.camel.test.AvailablePortFinder; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.eclipse.jetty.server.Connector; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.nio.SelectChannelConnector; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.junit.After; +import org.junit.Before; + +public class WebsocketCamelRouterTestSupport extends CamelTestSupport { + public static final String CONTEXT = "/mycontext"; + public static final String CONTEXT_URL = "http://localhost/mycontext"; + protected static final int PORT = AvailablePortFinder.getNextAvailable(); + protected boolean startCamelContext = true; + + protected Server server; + + @Before + public void setUp() throws Exception { + server = new Server(); + Connector connector = new SelectChannelConnector(); + connector.setHost("localhost"); + connector.setPort(PORT); + server.addConnector(connector); + + ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS); + context.setContextPath("/"); + server.setHandler(context); + + ServletHolder servletHolder = new ServletHolder(new CamelWebSocketServlet()); + servletHolder.setName("CamelWsServlet"); + context.addServlet(servletHolder, "/*"); + + server.start(); + + if (startCamelContext) { + super.setUp(); + } + } + + @After + public void tearDown() throws Exception { + if (startCamelContext) { + super.tearDown(); + } + + server.stop(); + server.destroy(); + } + + +} http://git-wip-us.apache.org/repos/asf/camel/blob/c9eb078d/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketComponentConfigurationAndDocumentationTest.java ---------------------------------------------------------------------- diff --git a/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketComponentConfigurationAndDocumentationTest.java b/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketComponentConfigurationAndDocumentationTest.java new file mode 100644 index 0000000..443ad3a --- /dev/null +++ b/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketComponentConfigurationAndDocumentationTest.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atmosphere.websocket; + +import org.apache.camel.CamelContext; +import org.apache.camel.ComponentConfiguration; +import org.apache.camel.EndpointConfiguration; +import org.apache.camel.component.atmosphere.websocket.WebsocketComponent; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +public class WebsocketComponentConfigurationAndDocumentationTest extends CamelTestSupport { + + @Override + public boolean isUseRouteBuilder() { + return false; + } + + @Test + public void testComponentConfiguration() throws Exception { + WebsocketComponent comp = context.getComponent("atmosphere-websocket", WebsocketComponent.class); + EndpointConfiguration conf = comp.createConfiguration("atmosphere-websocket://localhost:8088/hola?sendToAll=true&useStreaming=false"); + + assertEquals("true", conf.getParameter("sendToAll")); + assertEquals("false", conf.getParameter("useStreaming")); + + ComponentConfiguration compConf = comp.createComponentConfiguration(); + String json = compConf.createParameterJsonSchema(); + + assertTrue(json.contains("\"useStreaming\": { \"type\": \"boolean\" }")); + assertTrue(json.contains("\"sendToAll\": { \"type\": \"boolean\" }")); + } + + @Test + public void testComponentDocumentation() throws Exception { + CamelContext context = new DefaultCamelContext(); + String html = context.getComponentDocumentation("atmosphere-websocket"); + assertNotNull("Should have found some auto-generated HTML if on Java 7", html); + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/c9eb078d/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRouteTest.java ---------------------------------------------------------------------- diff --git a/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRouteTest.java b/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRouteTest.java new file mode 100644 index 0000000..f48c651 --- /dev/null +++ b/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRouteTest.java @@ -0,0 +1,216 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atmosphere.websocket; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.Reader; +import java.io.StringReader; +import java.util.List; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.junit.Test; + +public class WebsocketRouteTest extends WebsocketCamelRouterTestSupport { + private static final String RESPONSE_GREETING = "Hola "; + private static final byte[] RESPONSE_GREETING_BYTES = {0x48, 0x6f, 0x6c, 0x61, 0x20}; + + @Test + public void testWebsocketSingleClient() throws Exception { + TestClient wsclient = new TestClient("ws://localhost:" + PORT + "/hola"); + wsclient.connect(); + + wsclient.sendTextMessage("Cerveza"); + + assertTrue(wsclient.await(10)); + List<String> received = wsclient.getReceived(String.class); + assertEquals(1, received.size()); + assertEquals("Hola Cerveza", received.get(0)); + wsclient.close(); + } + + @Test + public void testWebsocketSingleClientForBytes() throws Exception { + TestClient wsclient = new TestClient("ws://localhost:" + PORT + "/hola"); + wsclient.connect(); + + wsclient.sendBytesMessage("Cerveza".getBytes("UTF-8")); + + assertTrue(wsclient.await(10)); + List<String> received = wsclient.getReceived(String.class); + assertEquals(1, received.size()); + assertEquals("Hola Cerveza", received.get(0)); + wsclient.close(); + } + + @Test + public void testWebsocketSingleClientForReader() throws Exception { + TestClient wsclient = new TestClient("ws://localhost:" + PORT + "/hola3"); + wsclient.connect(); + + wsclient.sendTextMessage("Cerveza"); + + assertTrue(wsclient.await(10)); + List<String> received = wsclient.getReceived(String.class); + assertEquals(1, received.size()); + assertEquals("Hola Cerveza", received.get(0)); + wsclient.close(); + } + + @Test + public void testWebsocketSingleClientForInputStream() throws Exception { + TestClient wsclient = new TestClient("ws://localhost:" + PORT + "/hola3"); + wsclient.connect(); + + wsclient.sendBytesMessage("Cerveza".getBytes("UTF-8")); + + assertTrue(wsclient.await(10)); + List<String> received = wsclient.getReceived(String.class); + assertEquals(1, received.size()); + assertEquals("Hola Cerveza", received.get(0)); + wsclient.close(); + } + + @Test + public void testWebsocketBroadcastClient() throws Exception { + TestClient wsclient1 = new TestClient("ws://localhost:" + PORT + "/hola2", 2); + TestClient wsclient2 = new TestClient("ws://localhost:" + PORT + "/hola2", 2); + wsclient1.connect(); + wsclient2.connect(); + + wsclient1.sendTextMessage("Gambas"); + wsclient2.sendTextMessage("Calamares"); + + assertTrue(wsclient1.await(10)); + assertTrue(wsclient2.await(10)); + + List<String> received1 = wsclient1.getReceived(String.class); + assertEquals(2, received1.size()); + + assertTrue(received1.contains("Hola Gambas")); + assertTrue(received1.contains("Hola Calamares")); + + List<String> received2 = wsclient2.getReceived(String.class); + assertEquals(2, received2.size()); + assertTrue(received2.contains("Hola Gambas")); + assertTrue(received2.contains("Hola Calamares")); + + wsclient1.close(); + wsclient2.close(); + } + + // START SNIPPET: payload + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + public void configure() { + // route for a single line + from("atmosphere-websocket:///hola").to("log:info").process(new Processor() { + public void process(final Exchange exchange) throws Exception { + createResponse(exchange, false); + } + }).to("atmosphere-websocket:///hola"); + + // route for a broadcast line + from("atmosphere-websocket:///hola2").to("log:info").process(new Processor() { + public void process(final Exchange exchange) throws Exception { + createResponse(exchange, false); + } + }).to("atmosphere-websocket:///hola2?sendToAll=true"); + + // route for a single stream line + from("atmosphere-websocket:///hola3?useStreaming=true").to("log:info").process(new Processor() { + public void process(final Exchange exchange) throws Exception { + createResponse(exchange, true); + } + }).to("atmosphere-websocket:///hola3"); + + } + }; + } + + private static void createResponse(Exchange exchange, boolean streaming) { + Object msg = exchange.getIn().getBody(); + if (streaming) { + assertTrue("Expects Reader or InputStream", msg instanceof Reader || msg instanceof InputStream); + } else { + assertTrue("Expects String or byte[]", msg instanceof String || msg instanceof byte[]); + } + + if (msg instanceof String) { + exchange.getIn().setBody(RESPONSE_GREETING + msg); + } else if (msg instanceof byte[]) { + exchange.getIn().setBody(createByteResponse((byte[])msg)); + } else if (msg instanceof Reader) { + exchange.getIn().setBody(new StringReader(RESPONSE_GREETING + readAll((Reader)msg))); + } else if (msg instanceof InputStream) { + exchange.getIn().setBody(createByteResponse(readAll((InputStream)msg))); + } + } + + private static byte[] createByteResponse(byte[] req) { + byte[] resp = new byte[req.length + RESPONSE_GREETING_BYTES.length]; + System.arraycopy(RESPONSE_GREETING_BYTES, 0, resp, 0, RESPONSE_GREETING_BYTES.length); + System.arraycopy(req, 0, resp, RESPONSE_GREETING_BYTES.length, req.length); + return resp; + } + + private static String readAll(Reader reader) { + StringBuffer strbuf = new StringBuffer(); + try { + char[] buf = new char[4024]; + int n; + while ((n = reader.read(buf, 0, buf.length)) > 0) { + strbuf.append(buf, 0, n); + } + } catch (IOException e) { + // ignore + } finally { + try { + reader.close(); + } catch (IOException e) { + // ignore + } + } + + return strbuf.toString(); + } + + private static byte[] readAll(InputStream is) { + ByteArrayOutputStream bytebuf = new ByteArrayOutputStream(); + try { + byte[] buf = new byte[4024]; + int n; + while ((n = is.read(buf, 0, buf.length)) > 0) { + bytebuf.write(buf, 0, n); + } + } catch (IOException e) { + // ignore + } finally { + try { + is.close(); + } catch (IOException e) { + // ignore + } + } + + return bytebuf.toByteArray(); + } + // END SNIPPET: payload +} http://git-wip-us.apache.org/repos/asf/camel/blob/c9eb078d/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/wsservlet/MemoryWebSocketStoreTest.java ---------------------------------------------------------------------- diff --git a/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/wsservlet/MemoryWebSocketStoreTest.java b/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/wsservlet/MemoryWebSocketStoreTest.java deleted file mode 100644 index 821bdef..0000000 --- a/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/wsservlet/MemoryWebSocketStoreTest.java +++ /dev/null @@ -1,70 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.wsservlet; - -import java.util.UUID; - -import org.apache.camel.component.atmosphere.websocket.MemoryWebSocketStore; -import org.atmosphere.websocket.WebSocket; -import org.easymock.EasyMock; -import org.junit.Assert; -import org.junit.Test; - - -public class MemoryWebSocketStoreTest extends Assert { - - @Test - public void testAddAndRemove() throws Exception { - MemoryWebSocketStore store = new MemoryWebSocketStore(); - WebSocket webSocket1 = EasyMock.createMock(WebSocket.class); - WebSocket webSocket2 = EasyMock.createMock(WebSocket.class); - - String connectionKey1 = UUID.randomUUID().toString(); - String connectionKey2 = UUID.randomUUID().toString(); - - store.addWebSocket(connectionKey1, webSocket1); - verifyGet(store, connectionKey1, webSocket1, true); - assertEquals(1, store.getAllWebSockets().size()); - - store.addWebSocket(connectionKey2, webSocket2); - verifyGet(store, connectionKey2, webSocket2, true); - verifyGet(store, connectionKey1, webSocket1, true); - assertEquals(2, store.getAllWebSockets().size()); - - store.removeWebSocket(connectionKey1); - verifyGet(store, connectionKey1, webSocket1, false); - - store.removeWebSocket(webSocket2); - verifyGet(store, connectionKey2, webSocket2, false); - - assertEquals(0, store.getAllWebSockets().size()); - } - - private void verifyGet(MemoryWebSocketStore store, String ck, WebSocket ws, boolean exists) { - WebSocket aws = store.getWebSocket(ck); - String ack = store.getConnectionKey(ws); - if (exists) { - assertNotNull(aws); - assertEquals(ws, aws); - assertNotNull(ack); - assertEquals(ck, ack); - } else { - assertNull(aws); - assertNull(ack); - } - } -} http://git-wip-us.apache.org/repos/asf/camel/blob/c9eb078d/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/wsservlet/TestClient.java ---------------------------------------------------------------------- diff --git a/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/wsservlet/TestClient.java b/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/wsservlet/TestClient.java deleted file mode 100644 index 5dbd254..0000000 --- a/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/wsservlet/TestClient.java +++ /dev/null @@ -1,165 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.wsservlet; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; - -import com.ning.http.client.AsyncHttpClient; -import com.ning.http.client.AsyncHttpClientConfig; -import com.ning.http.client.websocket.WebSocket; -import com.ning.http.client.websocket.WebSocketByteListener; -import com.ning.http.client.websocket.WebSocketTextListener; -import com.ning.http.client.websocket.WebSocketUpgradeHandler; - - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class TestClient { - private static final Logger LOG = LoggerFactory.getLogger(TestClient.class); - - private List<Object> received; - private CountDownLatch latch; - private AsyncHttpClient client; - private WebSocket websocket; - private String url; - - public TestClient(String url, AsyncHttpClientConfig conf) { - this(url, conf, 1); - } - - public TestClient(String url, int count) { - this(url, null, count); - } - - public TestClient(String url) { - this(url, null, 1); - } - - public TestClient(String url, AsyncHttpClientConfig conf, int count) { - this.received = new ArrayList<Object>(); - this.latch = new CountDownLatch(count); - this.client = conf == null ? new AsyncHttpClient() : new AsyncHttpClient(conf); - this.url = url; - } - - public void connect() throws InterruptedException, ExecutionException, IOException { - websocket = client.prepareGet(url).execute( - new WebSocketUpgradeHandler.Builder() - .addWebSocketListener(new TestWebSocketListener()).build()).get(); - } - - public void sendTextMessage(String message) { - websocket.sendTextMessage(message); - } - - public void sendBytesMessage(byte[] message) { - websocket.sendMessage(message); - } - - public boolean await(int secs) throws InterruptedException { - return latch.await(secs, TimeUnit.SECONDS); - } - - public void reset(int count) { - latch = new CountDownLatch(count); - received.clear(); - } - - public List<Object> getReceived() { - return received; - } - - public <T> List<T> getReceived(Class<T> cls) { - List<T> list = new ArrayList<T>(); - for (Object o : received) { - list.add(getValue(o, cls)); - } - return list; - } - - @SuppressWarnings("unchecked") - private static <T> T getValue(Object o, Class<T> cls) { - if (cls.isInstance(o)) { - return (T)o; - } else if (cls == String.class) { - if (o instanceof byte[]) { - return (T)new String((byte[])o); - } else { - return (T)o.toString(); - } - } else if (cls == byte[].class) { - if (o instanceof String) { - return (T)((String)o).getBytes(); - } - } - return null; - } - - public void close() { - websocket.close(); - client.close(); - } - - private class TestWebSocketListener implements WebSocketTextListener, WebSocketByteListener { - - @Override - public void onOpen(WebSocket websocket) { - LOG.info("[ws] opened"); - } - - @Override - public void onClose(WebSocket websocket) { - LOG.info("[ws] closed"); - } - - @Override - public void onError(Throwable t) { - LOG.error("[ws] error", t); - } - - @Override - public void onMessage(byte[] message) { - received.add(message); - LOG.info("[ws] received bytes --> " + message); - latch.countDown(); - } - - @Override - public void onFragment(byte[] fragment, boolean last) { - // TODO Auto-generated method stub - } - - @Override - public void onMessage(String message) { - received.add(message); - LOG.info("[ws] received --> " + message); - latch.countDown(); - } - - @Override - public void onFragment(String fragment, boolean last) { - // TODO Auto-generated method stub - } - - } -} http://git-wip-us.apache.org/repos/asf/camel/blob/c9eb078d/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/wsservlet/WebsocketCamelRouterTestSupport.java ---------------------------------------------------------------------- diff --git a/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/wsservlet/WebsocketCamelRouterTestSupport.java b/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/wsservlet/WebsocketCamelRouterTestSupport.java deleted file mode 100644 index dbf9288..0000000 --- a/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/wsservlet/WebsocketCamelRouterTestSupport.java +++ /dev/null @@ -1,72 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.wsservlet; - -import org.apache.camel.component.atmosphere.websocket.CamelWebSocketServlet; -import org.apache.camel.test.AvailablePortFinder; -import org.apache.camel.test.junit4.CamelTestSupport; -import org.eclipse.jetty.server.Connector; -import org.eclipse.jetty.server.Server; -import org.eclipse.jetty.server.nio.SelectChannelConnector; -import org.eclipse.jetty.servlet.ServletContextHandler; -import org.eclipse.jetty.servlet.ServletHolder; -import org.junit.After; -import org.junit.Before; - -public class WebsocketCamelRouterTestSupport extends CamelTestSupport { - public static final String CONTEXT = "/mycontext"; - public static final String CONTEXT_URL = "http://localhost/mycontext"; - protected static final int PORT = AvailablePortFinder.getNextAvailable(); - protected boolean startCamelContext = true; - - protected Server server; - - @Before - public void setUp() throws Exception { - server = new Server(); - Connector connector = new SelectChannelConnector(); - connector.setHost("localhost"); - connector.setPort(PORT); - server.addConnector(connector); - - ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS); - context.setContextPath("/"); - server.setHandler(context); - - ServletHolder servletHolder = new ServletHolder(new CamelWebSocketServlet()); - servletHolder.setName("CamelWsServlet"); - context.addServlet(servletHolder, "/*"); - - server.start(); - - if (startCamelContext) { - super.setUp(); - } - } - - @After - public void tearDown() throws Exception { - if (startCamelContext) { - super.tearDown(); - } - - server.stop(); - server.destroy(); - } - - -} http://git-wip-us.apache.org/repos/asf/camel/blob/c9eb078d/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/wsservlet/WebsocketComponentConfigurationAndDocumentationTest.java ---------------------------------------------------------------------- diff --git a/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/wsservlet/WebsocketComponentConfigurationAndDocumentationTest.java b/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/wsservlet/WebsocketComponentConfigurationAndDocumentationTest.java deleted file mode 100644 index 8b74d43..0000000 --- a/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/wsservlet/WebsocketComponentConfigurationAndDocumentationTest.java +++ /dev/null @@ -1,56 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.wsservlet; - -import org.apache.camel.CamelContext; -import org.apache.camel.ComponentConfiguration; -import org.apache.camel.EndpointConfiguration; -import org.apache.camel.component.atmosphere.websocket.WebsocketComponent; -import org.apache.camel.impl.DefaultCamelContext; -import org.apache.camel.test.junit4.CamelTestSupport; -import org.junit.Test; - -public class WebsocketComponentConfigurationAndDocumentationTest extends CamelTestSupport { - - @Override - public boolean isUseRouteBuilder() { - return false; - } - - @Test - public void testComponentConfiguration() throws Exception { - WebsocketComponent comp = context.getComponent("atmosphere-websocket", WebsocketComponent.class); - EndpointConfiguration conf = comp.createConfiguration("atmosphere-websocket://localhost:8088/hola?sendToAll=true&useStreaming=false"); - - assertEquals("true", conf.getParameter("sendToAll")); - assertEquals("false", conf.getParameter("useStreaming")); - - ComponentConfiguration compConf = comp.createComponentConfiguration(); - String json = compConf.createParameterJsonSchema(); - - assertTrue(json.contains("\"useStreaming\": { \"type\": \"boolean\" }")); - assertTrue(json.contains("\"sendToAll\": { \"type\": \"boolean\" }")); - } - - @Test - public void testComponentDocumentation() throws Exception { - CamelContext context = new DefaultCamelContext(); - String html = context.getComponentDocumentation("atmosphere-websocket"); - assertNotNull("Should have found some auto-generated HTML if on Java 7", html); - } - -} http://git-wip-us.apache.org/repos/asf/camel/blob/c9eb078d/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/wsservlet/WebsocketRouteTest.java ---------------------------------------------------------------------- diff --git a/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/wsservlet/WebsocketRouteTest.java b/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/wsservlet/WebsocketRouteTest.java deleted file mode 100644 index 68f2dbc..0000000 --- a/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/wsservlet/WebsocketRouteTest.java +++ /dev/null @@ -1,216 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.wsservlet; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.Reader; -import java.io.StringReader; -import java.util.List; - -import org.apache.camel.Exchange; -import org.apache.camel.Processor; -import org.apache.camel.builder.RouteBuilder; -import org.junit.Test; - -public class WebsocketRouteTest extends WebsocketCamelRouterTestSupport { - private static final String RESPONSE_GREETING = "Hola "; - private static final byte[] RESPONSE_GREETING_BYTES = {0x48, 0x6f, 0x6c, 0x61, 0x20}; - - @Test - public void testWebsocketSingleClient() throws Exception { - TestClient wsclient = new TestClient("ws://localhost:" + PORT + "/hola"); - wsclient.connect(); - - wsclient.sendTextMessage("Cerveza"); - - assertTrue(wsclient.await(10)); - List<String> received = wsclient.getReceived(String.class); - assertEquals(1, received.size()); - assertEquals("Hola Cerveza", received.get(0)); - wsclient.close(); - } - - @Test - public void testWebsocketSingleClientForBytes() throws Exception { - TestClient wsclient = new TestClient("ws://localhost:" + PORT + "/hola"); - wsclient.connect(); - - wsclient.sendBytesMessage("Cerveza".getBytes("UTF-8")); - - assertTrue(wsclient.await(10)); - List<String> received = wsclient.getReceived(String.class); - assertEquals(1, received.size()); - assertEquals("Hola Cerveza", received.get(0)); - wsclient.close(); - } - - @Test - public void testWebsocketSingleClientForReader() throws Exception { - TestClient wsclient = new TestClient("ws://localhost:" + PORT + "/hola3"); - wsclient.connect(); - - wsclient.sendTextMessage("Cerveza"); - - assertTrue(wsclient.await(10)); - List<String> received = wsclient.getReceived(String.class); - assertEquals(1, received.size()); - assertEquals("Hola Cerveza", received.get(0)); - wsclient.close(); - } - - @Test - public void testWebsocketSingleClientForInputStream() throws Exception { - TestClient wsclient = new TestClient("ws://localhost:" + PORT + "/hola3"); - wsclient.connect(); - - wsclient.sendBytesMessage("Cerveza".getBytes("UTF-8")); - - assertTrue(wsclient.await(10)); - List<String> received = wsclient.getReceived(String.class); - assertEquals(1, received.size()); - assertEquals("Hola Cerveza", received.get(0)); - wsclient.close(); - } - - @Test - public void testWebsocketBroadcastClient() throws Exception { - TestClient wsclient1 = new TestClient("ws://localhost:" + PORT + "/hola2", 2); - TestClient wsclient2 = new TestClient("ws://localhost:" + PORT + "/hola2", 2); - wsclient1.connect(); - wsclient2.connect(); - - wsclient1.sendTextMessage("Gambas"); - wsclient2.sendTextMessage("Calamares"); - - assertTrue(wsclient1.await(10)); - assertTrue(wsclient2.await(10)); - - List<String> received1 = wsclient1.getReceived(String.class); - assertEquals(2, received1.size()); - - assertTrue(received1.contains("Hola Gambas")); - assertTrue(received1.contains("Hola Calamares")); - - List<String> received2 = wsclient2.getReceived(String.class); - assertEquals(2, received2.size()); - assertTrue(received2.contains("Hola Gambas")); - assertTrue(received2.contains("Hola Calamares")); - - wsclient1.close(); - wsclient2.close(); - } - - // START SNIPPET: payload - protected RouteBuilder createRouteBuilder() { - return new RouteBuilder() { - public void configure() { - // route for a single line - from("atmosphere-websocket:///hola").to("log:info").process(new Processor() { - public void process(final Exchange exchange) throws Exception { - createResponse(exchange, false); - } - }).to("atmosphere-websocket:///hola"); - - // route for a broadcast line - from("atmosphere-websocket:///hola2").to("log:info").process(new Processor() { - public void process(final Exchange exchange) throws Exception { - createResponse(exchange, false); - } - }).to("atmosphere-websocket:///hola2?sendToAll=true"); - - // route for a single stream line - from("atmosphere-websocket:///hola3?useStreaming=true").to("log:info").process(new Processor() { - public void process(final Exchange exchange) throws Exception { - createResponse(exchange, true); - } - }).to("atmosphere-websocket:///hola3"); - - } - }; - } - - private static void createResponse(Exchange exchange, boolean streaming) { - Object msg = exchange.getIn().getBody(); - if (streaming) { - assertTrue("Expects Reader or InputStream", msg instanceof Reader || msg instanceof InputStream); - } else { - assertTrue("Expects String or byte[]", msg instanceof String || msg instanceof byte[]); - } - - if (msg instanceof String) { - exchange.getIn().setBody(RESPONSE_GREETING + msg); - } else if (msg instanceof byte[]) { - exchange.getIn().setBody(createByteResponse((byte[])msg)); - } else if (msg instanceof Reader) { - exchange.getIn().setBody(new StringReader(RESPONSE_GREETING + readAll((Reader)msg))); - } else if (msg instanceof InputStream) { - exchange.getIn().setBody(createByteResponse(readAll((InputStream)msg))); - } - } - - private static byte[] createByteResponse(byte[] req) { - byte[] resp = new byte[req.length + RESPONSE_GREETING_BYTES.length]; - System.arraycopy(RESPONSE_GREETING_BYTES, 0, resp, 0, RESPONSE_GREETING_BYTES.length); - System.arraycopy(req, 0, resp, RESPONSE_GREETING_BYTES.length, req.length); - return resp; - } - - private static String readAll(Reader reader) { - StringBuffer strbuf = new StringBuffer(); - try { - char[] buf = new char[4024]; - int n; - while ((n = reader.read(buf, 0, buf.length)) > 0) { - strbuf.append(buf, 0, n); - } - } catch (IOException e) { - // ignore - } finally { - try { - reader.close(); - } catch (IOException e) { - // ignore - } - } - - return strbuf.toString(); - } - - private static byte[] readAll(InputStream is) { - ByteArrayOutputStream bytebuf = new ByteArrayOutputStream(); - try { - byte[] buf = new byte[4024]; - int n; - while ((n = is.read(buf, 0, buf.length)) > 0) { - bytebuf.write(buf, 0, n); - } - } catch (IOException e) { - // ignore - } finally { - try { - is.close(); - } catch (IOException e) { - // ignore - } - } - - return bytebuf.toByteArray(); - } - // END SNIPPET: payload -}