Moved WebSocketStore and NodeSynchronization from the Endpoint to the Connector
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2164aba7 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2164aba7 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2164aba7 Branch: refs/heads/camel-2.16.x Commit: 2164aba73b68692417d95f8d850cdcbf3a54d00b Parents: 4e3ff19 Author: Ton Swieb <ton.sw...@finalist.nl> Authored: Sat Nov 21 21:44:30 2015 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sun Nov 22 11:36:57 2015 +0100 ---------------------------------------------------------------------- .../component/websocket/WebsocketComponent.java | 25 ++++- .../component/websocket/WebsocketEndpoint.java | 27 +---- .../component/websocket/WebsocketProducer.java | 10 +- .../websocket/WebsocketProducerTest.java | 3 +- ...dividualAndBroadcastEndpointExampleTest.java | 110 +++++++++++++++++++ ...ocketTwoRoutesToSameEndpointExampleTest.java | 109 ++++++++++++++++++ 6 files changed, 249 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/2164aba7/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java index 55f7df1..1251a53 100644 --- a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java +++ b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java @@ -91,12 +91,14 @@ public class WebsocketComponent extends UriEndpointComponent { Server server; Connector connector; WebsocketComponentServlet servlet; + MemoryWebsocketStore memoryStore; int refCount; - public ConnectorRef(Server server, Connector connector, WebsocketComponentServlet servlet) { + public ConnectorRef(Server server, Connector connector, WebsocketComponentServlet servlet, MemoryWebsocketStore memoryStore) { this.server = server; this.connector = connector; this.servlet = servlet; + this.memoryStore = memoryStore; increment(); } @@ -176,14 +178,17 @@ public class WebsocketComponent extends UriEndpointComponent { server = createStaticResourcesServer(server, context, endpoint.getStaticResources()); } + MemoryWebsocketStore memoryStore = new MemoryWebsocketStore(); + // Don't provide a Servlet object as Producer/Consumer will create them later on - connectorRef = new ConnectorRef(server, connector, null); + connectorRef = new ConnectorRef(server, connector, null, memoryStore); // must enable session before we start if (endpoint.isSessionSupport()) { enableSessionSupport(connectorRef.server, connectorKey); } LOG.info("Jetty Server starting on host: {}:{}", connector.getHost(), connector.getPort()); + connectorRef.memoryStore.start(); connectorRef.server.start(); CONNECTORS.put(connectorKey, connectorRef); @@ -197,7 +202,8 @@ public class WebsocketComponent extends UriEndpointComponent { enableSessionSupport(connectorRef.server, connectorKey); } - WebsocketComponentServlet servlet = addServlet(endpoint.getNodeSynchronization(), prodcon, endpoint.getResourceUri()); + NodeSynchronization sync = new DefaultNodeSynchronization(connectorRef.memoryStore); + WebsocketComponentServlet servlet = addServlet(sync, prodcon, endpoint.getResourceUri()); if (prodcon instanceof WebsocketConsumer) { WebsocketConsumer consumer = WebsocketConsumer.class.cast(prodcon); if (servlet.getConsumer() == null) { @@ -206,7 +212,10 @@ public class WebsocketComponent extends UriEndpointComponent { // register the consumer here servlet.connect(consumer); } - + if (prodcon instanceof WebsocketProducer) { + WebsocketProducer producer = WebsocketProducer.class.cast(prodcon); + producer.setStore(connectorRef.memoryStore); + } } } @@ -234,6 +243,7 @@ public class WebsocketComponent extends UriEndpointComponent { connectorRef.connector.stop(); } connectorRef.server.stop(); + connectorRef.memoryStore.stop(); CONNECTORS.remove(connectorKey); // Camel controls the lifecycle of these entities so remove the // registered MBeans when Camel is done with the managed objects. @@ -245,6 +255,9 @@ public class WebsocketComponent extends UriEndpointComponent { if (prodcon instanceof WebsocketConsumer) { connectorRef.servlet.disconnect((WebsocketConsumer) prodcon); } + if (prodcon instanceof WebsocketProducer) { + ((WebsocketProducer) prodcon).setStore(null); + } } } } @@ -791,7 +804,8 @@ public class WebsocketComponent extends UriEndpointComponent { // must add static resource server to CONNECTORS in case the websocket producers/consumers // uses the same port number, and therefore we must be part of this - ConnectorRef ref = new ConnectorRef(staticResourcesServer, connector, null); + MemoryWebsocketStore memoryStore = new MemoryWebsocketStore(); + ConnectorRef ref = new ConnectorRef(staticResourcesServer, connector, null,memoryStore); String key = "websocket:" + host + ":" + port; CONNECTORS.put(key, ref); } @@ -807,6 +821,7 @@ public class WebsocketComponent extends UriEndpointComponent { connectorRef.server.removeConnector(connectorRef.connector); connectorRef.connector.stop(); connectorRef.server.stop(); + connectorRef.memoryStore.stop(); connectorRef.servlet = null; } CONNECTORS.remove(connectorKey); http://git-wip-us.apache.org/repos/asf/camel/blob/2164aba7/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java index ba6f0c5..26100f9 100644 --- a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java +++ b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java @@ -30,15 +30,12 @@ import org.apache.camel.spi.UriEndpoint; import org.apache.camel.spi.UriParam; import org.apache.camel.spi.UriPath; import org.apache.camel.util.ObjectHelper; -import org.apache.camel.util.ServiceHelper; import org.apache.camel.util.jsse.SSLContextParameters; import org.eclipse.jetty.server.Handler; @UriEndpoint(scheme = "websocket", title = "Jetty Websocket", syntax = "websocket:host:port/resourceUri", consumerClass = WebsocketConsumer.class, label = "websocket") public class WebsocketEndpoint extends DefaultEndpoint { - private NodeSynchronization sync; - private WebsocketStore memoryStore; private WebsocketComponent component; private URI uri; private List<Handler> handlers; @@ -80,8 +77,6 @@ public class WebsocketEndpoint extends DefaultEndpoint { public WebsocketEndpoint(WebsocketComponent component, String uri, String resourceUri, Map<String, Object> parameters) { super(uri, component); this.resourceUri = resourceUri; - this.memoryStore = new MemoryWebsocketStore(); - this.sync = new DefaultNodeSynchronization(memoryStore); this.component = component; try { this.uri = new URI(uri); @@ -106,7 +101,7 @@ public class WebsocketEndpoint extends DefaultEndpoint { @Override public Producer createProducer() throws Exception { - return new WebsocketProducer(this, memoryStore); + return new WebsocketProducer(this); } public void connect(WebsocketConsumer consumer) throws Exception { @@ -335,24 +330,4 @@ public class WebsocketEndpoint extends DefaultEndpoint { public void setResourceUri(String resourceUri) { this.resourceUri = resourceUri; } - - /** - * NodeSynchronization - * @return NodeSynchronization - */ - public NodeSynchronization getNodeSynchronization() { - return this.sync; - } - - @Override - protected void doStart() throws Exception { - ServiceHelper.startService(memoryStore); - super.doStart(); - } - - @Override - protected void doStop() throws Exception { - ServiceHelper.stopService(memoryStore); - super.doStop(); - } } http://git-wip-us.apache.org/repos/asf/camel/blob/2164aba7/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java index 18b0775..89b2932 100644 --- a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java +++ b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java @@ -26,13 +26,12 @@ import org.apache.camel.impl.DefaultProducer; public class WebsocketProducer extends DefaultProducer implements WebsocketProducerConsumer { - private final WebsocketStore store; + private WebsocketStore store; private final Boolean sendToAll; private final WebsocketEndpoint endpoint; - public WebsocketProducer(WebsocketEndpoint endpoint, WebsocketStore store) { + public WebsocketProducer(WebsocketEndpoint endpoint) { super(endpoint); - this.store = store; this.sendToAll = endpoint.getSendToAll(); this.endpoint = endpoint; } @@ -111,4 +110,9 @@ public class WebsocketProducer extends DefaultProducer implements WebsocketProdu } } } + + //Store is set/unset upon connect/disconnect of the producer + public void setStore(WebsocketStore store) { + this.store = store; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/2164aba7/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerTest.java b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerTest.java index 7a1bcdf..a05f6d8 100644 --- a/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerTest.java +++ b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerTest.java @@ -71,7 +71,8 @@ public class WebsocketProducerTest { @Before public void setUp() throws Exception { - websocketProducer = new WebsocketProducer(endpoint, store); + websocketProducer = new WebsocketProducer(endpoint); + websocketProducer.setStore(store); sockets = Arrays.asList(defaultWebsocket1, defaultWebsocket2); } http://git-wip-us.apache.org/repos/asf/camel/blob/2164aba7/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketTwoRoutesToSIndividualAndBroadcastEndpointExampleTest.java ---------------------------------------------------------------------- diff --git a/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketTwoRoutesToSIndividualAndBroadcastEndpointExampleTest.java b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketTwoRoutesToSIndividualAndBroadcastEndpointExampleTest.java new file mode 100644 index 0000000..b2a3c9d --- /dev/null +++ b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketTwoRoutesToSIndividualAndBroadcastEndpointExampleTest.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.websocket; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import com.ning.http.client.AsyncHttpClient; +import com.ning.http.client.ws.WebSocket; +import com.ning.http.client.ws.WebSocketTextListener; +import com.ning.http.client.ws.WebSocketUpgradeHandler; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.test.AvailablePortFinder; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +public class WebsocketTwoRoutesToSIndividualAndBroadcastEndpointExampleTest extends CamelTestSupport { + + private static List<String> received = new ArrayList<String>(); + private static CountDownLatch latch; + private int port; + + @Override + public void setUp() throws Exception { + port = AvailablePortFinder.getNextAvailable(16310); + super.setUp(); + } + + @Test + public void testWSHttpCallEcho() throws Exception { + + // We call the route WebSocket BAR + received.clear(); + latch = new CountDownLatch(2); + + AsyncHttpClient c = new AsyncHttpClient(); + + WebSocket websocket = c.prepareGet("ws://localhost:" + port + "/bar").execute( + new WebSocketUpgradeHandler.Builder() + .addWebSocketListener(new WebSocketTextListener() { + @Override + public void onMessage(String message) { + received.add(message); + log.info("received --> " + message); + latch.countDown(); + } + + @Override + public void onOpen(WebSocket websocket) { + } + + @Override + public void onClose(WebSocket websocket) { + } + + @Override + public void onError(Throwable t) { + t.printStackTrace(); + } + }).build()).get(); + + websocket.sendMessage("Beer"); + assertTrue(latch.await(10, TimeUnit.SECONDS)); + + assertEquals(2, received.size()); + + //Cannot guarantee the order in which messages are received + assertTrue(received.contains("The bar has Beer")); + assertTrue(received.contains("Broadcasting to Bar")); + + websocket.close(); + c.close(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() { + + from("websocket://localhost:" + port + "/bar") + .log(">>> Message received from BAR WebSocket Client : ${body}") + .transform().simple("The bar has ${body}") + .to("websocket://localhost:" + port + "/bar"); + + from("timer://foo?fixedRate=true&period=12000") + //Use a period which is longer then the latch await time + .setBody(constant("Broadcasting to Bar")) + .log(">>> Broadcasting message to Bar WebSocket Client") + .setHeader(WebsocketConstants.SEND_TO_ALL,constant(true)) + .to("websocket://localhost:" + port + "/bar"); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/2164aba7/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketTwoRoutesToSameEndpointExampleTest.java ---------------------------------------------------------------------- diff --git a/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketTwoRoutesToSameEndpointExampleTest.java b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketTwoRoutesToSameEndpointExampleTest.java new file mode 100644 index 0000000..ec089bf --- /dev/null +++ b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketTwoRoutesToSameEndpointExampleTest.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.websocket; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import com.ning.http.client.AsyncHttpClient; +import com.ning.http.client.ws.WebSocket; +import com.ning.http.client.ws.WebSocketTextListener; +import com.ning.http.client.ws.WebSocketUpgradeHandler; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.test.AvailablePortFinder; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +public class WebsocketTwoRoutesToSameEndpointExampleTest extends CamelTestSupport { + + private static List<String> received = new ArrayList<String>(); + private static CountDownLatch latch; + private int port; + + @Override + public void setUp() throws Exception { + port = AvailablePortFinder.getNextAvailable(16310); + super.setUp(); + } + + @Test + public void testWSHttpCallEcho() throws Exception { + + // We call the route WebSocket BAR + received.clear(); + latch = new CountDownLatch(2); + + AsyncHttpClient c = new AsyncHttpClient(); + + WebSocket websocket = c.prepareGet("ws://localhost:" + port + "/bar").execute( + new WebSocketUpgradeHandler.Builder() + .addWebSocketListener(new WebSocketTextListener() { + @Override + public void onMessage(String message) { + received.add(message); + log.info("received --> " + message); + latch.countDown(); + } + + @Override + public void onOpen(WebSocket websocket) { + } + + @Override + public void onClose(WebSocket websocket) { + } + + @Override + public void onError(Throwable t) { + t.printStackTrace(); + } + }).build()).get(); + + websocket.sendMessage("Beer"); + assertTrue(latch.await(10, TimeUnit.SECONDS)); + + assertEquals(2, received.size()); + + //Cannot guarantee the order in which messages are received + assertTrue(received.contains("The bar has Beer")); + assertTrue(received.contains("Broadcasting to Bar")); + + websocket.close(); + c.close(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() { + + from("websocket://localhost:" + port + "/bar") + .log(">>> Message received from BAR WebSocket Client : ${body}") + .transform().simple("The bar has ${body}") + .to("websocket://localhost:" + port + "/bar"); + + from("timer://foo?fixedRate=true&period=12000") + //Use a period which is longer then the latch await time + .setBody(constant("Broadcasting to Bar")) + .log(">>> Broadcasting message to Bar WebSocket Client") + .to("websocket://localhost:" + port + "/bar?sendToAll=true"); + } + }; + } +}