Repository: camel Updated Branches: refs/heads/camel-2.16.x 3f84e4a20 -> 136823a7d refs/heads/master 6fb6be7f6 -> ee3deceb9
CAMEL-9257 route stop/start doesn't work for camel-websocket producer Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ee3deceb Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ee3deceb Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ee3deceb Branch: refs/heads/master Commit: ee3deceb902e944c54a1a87d4ea47e50ad21e47d Parents: 6fb6be7 Author: Tomohisa Igarashi <tm.igara...@gmail.com> Authored: Wed Oct 28 10:15:10 2015 +0000 Committer: Claus Ibsen <davscl...@apache.org> Committed: Wed Oct 28 11:32:37 2015 +0100 ---------------------------------------------------------------------- .../component/websocket/WebsocketComponent.java | 52 +++----- .../component/websocket/WebsocketEndpoint.java | 12 +- .../WebsocketProducerRouteRestartTest.java | 130 +++++++++++++++++++ 3 files changed, 155 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/ee3deceb/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java index cf8af8a..55f7df1 100644 --- a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java +++ b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java @@ -142,8 +142,6 @@ public class WebsocketComponent extends UriEndpointComponent { connector = new SelectChannelConnector(); } - LOG.trace("Jetty Connector added: {}", connector.getName()); - if (endpoint.getPort() != null) { connector.setPort(endpoint.getPort()); } else { @@ -162,6 +160,7 @@ public class WebsocketComponent extends UriEndpointComponent { enableJmx(server); } server.addConnector(connector); + LOG.trace("Jetty Connector added: {}", connector.getName()); // Create ServletContextHandler ServletContextHandler context = createContext(server, connector, endpoint.getHandlers()); @@ -198,6 +197,16 @@ public class WebsocketComponent extends UriEndpointComponent { enableSessionSupport(connectorRef.server, connectorKey); } + WebsocketComponentServlet servlet = addServlet(endpoint.getNodeSynchronization(), prodcon, endpoint.getResourceUri()); + if (prodcon instanceof WebsocketConsumer) { + WebsocketConsumer consumer = WebsocketConsumer.class.cast(prodcon); + if (servlet.getConsumer() == null) { + servlet.setConsumer(consumer); + } + // register the consumer here + servlet.connect(consumer); + } + } } @@ -215,6 +224,10 @@ public class WebsocketComponent extends UriEndpointComponent { ConnectorRef connectorRef = CONNECTORS.get(connectorKey); if (connectorRef != null) { if (connectorRef.decrement() == 0) { + LOG.info("Stopping Jetty Server as the last connector is disconnecting: {}:{}" + , connectorRef.connector.getHost() + , connectorRef.connector.getPort()); + servlets.remove(createPathSpec(endpoint.getResourceUri())); connectorRef.server.removeConnector(connectorRef.connector); if (connectorRef.connector != null) { // static server may not have set a connector @@ -408,35 +421,10 @@ public class WebsocketComponent extends UriEndpointComponent { return createStaticResourcesServer(server, context, home); } - protected WebsocketComponentServlet addServlet(NodeSynchronization sync, WebsocketProducer producer, String remaining) throws Exception { + protected WebsocketComponentServlet addServlet(NodeSynchronization sync, WebsocketProducerConsumer prodcon, String resourceUri) throws Exception { // Get Connector from one of the Jetty Instances to add WebSocket Servlet - WebsocketEndpoint endpoint = producer.getEndpoint(); - String key = getConnectorKey(endpoint); - ConnectorRef connectorRef = getConnectors().get(key); - - WebsocketComponentServlet servlet; - - if (connectorRef != null) { - String pathSpec = createPathSpec(remaining); - servlet = servlets.get(pathSpec); - if (servlet == null) { - // Retrieve Context - ServletContextHandler context = (ServletContextHandler) connectorRef.server.getHandler(); - servlet = createServlet(sync, pathSpec, servlets, context); - connectorRef.servlet = servlet; - LOG.debug("WebSocket Producer Servlet added for the following path : " + pathSpec + ", to the Jetty Server : " + key); - } - return servlet; - } else { - throw new Exception("Jetty instance has not been retrieved for : " + key); - } - } - - protected WebsocketComponentServlet addServlet(NodeSynchronization sync, WebsocketConsumer consumer, String resourceUri) throws Exception { - - // Get Connector from one of the Jetty Instances to add WebSocket Servlet - WebsocketEndpoint endpoint = consumer.getEndpoint(); + WebsocketEndpoint endpoint = prodcon.getEndpoint(); String key = getConnectorKey(endpoint); ConnectorRef connectorRef = getConnectors().get(key); @@ -450,15 +438,9 @@ public class WebsocketComponent extends UriEndpointComponent { ServletContextHandler context = (ServletContextHandler) connectorRef.server.getHandler(); servlet = createServlet(sync, pathSpec, servlets, context); connectorRef.servlet = servlet; - servlets.put(pathSpec, servlet); LOG.debug("WebSocket servlet added for the following path : " + pathSpec + ", to the Jetty Server : " + key); } - if (servlet.getConsumer() == null) { - servlet.setConsumer(consumer); - } - // register the consumer here - servlet.connect(consumer); return servlet; } else { throw new Exception("Jetty instance has not been retrieved for : " + key); http://git-wip-us.apache.org/repos/asf/camel/blob/ee3deceb/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java index ffafdaf..ba6f0c5 100644 --- a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java +++ b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java @@ -111,22 +111,18 @@ public class WebsocketEndpoint extends DefaultEndpoint { public void connect(WebsocketConsumer consumer) throws Exception { component.connect(consumer); - component.addServlet(sync, consumer, resourceUri); } public void disconnect(WebsocketConsumer consumer) throws Exception { component.disconnect(consumer); - // Servlet should be removed } public void connect(WebsocketProducer producer) throws Exception { component.connect(producer); - component.addServlet(sync, producer, resourceUri); } public void disconnect(WebsocketProducer producer) throws Exception { component.disconnect(producer); - // Servlet should be removed } @Override @@ -340,6 +336,14 @@ public class WebsocketEndpoint extends DefaultEndpoint { this.resourceUri = resourceUri; } + /** + * NodeSynchronization + * @return NodeSynchronization + */ + public NodeSynchronization getNodeSynchronization() { + return this.sync; + } + @Override protected void doStart() throws Exception { ServiceHelper.startService(memoryStore); http://git-wip-us.apache.org/repos/asf/camel/blob/ee3deceb/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerRouteRestartTest.java ---------------------------------------------------------------------- diff --git a/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerRouteRestartTest.java b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerRouteRestartTest.java new file mode 100644 index 0000000..b1b2ce2 --- /dev/null +++ b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerRouteRestartTest.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.websocket; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import com.ning.http.client.AsyncHttpClient; +import com.ning.http.client.ws.WebSocket; +import com.ning.http.client.ws.WebSocketTextListener; +import com.ning.http.client.ws.WebSocketUpgradeHandler; +import org.apache.camel.Produce; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.test.AvailablePortFinder; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Before; +import org.junit.Test; + +public class WebsocketProducerRouteRestartTest extends CamelTestSupport { + + private static final String ROUTE_ID = WebsocketProducerRouteRestartTest.class.getSimpleName(); + private static List<Object> received = new ArrayList<Object>(); + private static CountDownLatch latch; + protected int port; + + @Produce(uri = "direct:shop") + private ProducerTemplate producer; + + @Override + @Before + public void setUp() throws Exception { + port = AvailablePortFinder.getNextAvailable(16200); + super.setUp(); + received.clear(); + latch = new CountDownLatch(1); + } + + @Test + public void testWSSuspendResumeRoute() throws Exception { + context.suspendRoute(ROUTE_ID); + context.resumeRoute(ROUTE_ID); + doTestWSHttpCall(); + } + + @Test + public void testWSStopStartRoute() throws Exception { + context.stopRoute(ROUTE_ID); + context.startRoute(ROUTE_ID); + doTestWSHttpCall(); + } + + @Test + public void testWSRemoveAddRoute() throws Exception { + context.removeRoute(ROUTE_ID); + context.addRoutes(createRouteBuilder()); + context.startRoute(ROUTE_ID); + doTestWSHttpCall(); + } + + private void doTestWSHttpCall() throws Exception { + AsyncHttpClient c = new AsyncHttpClient(); + + WebSocket websocket = c.prepareGet("ws://localhost:" + port + "/shop").execute( + new WebSocketUpgradeHandler.Builder() + .addWebSocketListener(new WebSocketTextListener() { + @Override + public void onMessage(String message) { + received.add(message); + log.info("received --> " + message); + latch.countDown(); + } + + @Override + public void onOpen(WebSocket websocket) { + } + + @Override + public void onClose(WebSocket websocket) { + } + + @Override + public void onError(Throwable t) { + t.printStackTrace(); + } + }).build()).get(); + + // Send message to the direct endpoint + producer.sendBodyAndHeader("Beer on stock at Apache Mall", WebsocketConstants.SEND_TO_ALL, "true"); + + assertTrue(latch.await(10, TimeUnit.SECONDS)); + + assertEquals(1, received.size()); + Object r = received.get(0); + assertTrue(r instanceof String); + assertEquals("Beer on stock at Apache Mall", r); + + websocket.close(); + c.close(); + + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() { + from("direct:shop") + .id(ROUTE_ID) + .log(">>> Message received from Shopping center : ${body}") + .to("websocket://localhost:" + port + "/shop"); + } + }; + } +}