Author: cmoulliard Date: Thu May 24 09:29:24 2012 New Revision: 1342178 URL: http://svn.apache.org/viewvc?rev=1342178&view=rev Log: CAMEL-5280: Add support for websocket secure --> wss://
Added: camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducerConsumer.java camel/trunk/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketSSLContextInUriRouteExampleTest.java camel/trunk/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketSSLRouteExampleTest.java camel/trunk/components/camel-websocket/src/test/resources/jsse/ camel/trunk/components/camel-websocket/src/test/resources/jsse/localhost.ks (with props) Modified: camel/trunk/components/camel-websocket/pom.xml camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketConstants.java camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketConsumer.java camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java camel/trunk/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketComponentTest.java camel/trunk/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketConsumerTest.java camel/trunk/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketEndpointTest.java camel/trunk/components/camel-websocket/src/test/resources/log4j.properties Modified: camel/trunk/components/camel-websocket/pom.xml URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-websocket/pom.xml?rev=1342178&r1=1342177&r2=1342178&view=diff ============================================================================== --- camel/trunk/components/camel-websocket/pom.xml (original) +++ camel/trunk/components/camel-websocket/pom.xml Thu May 24 09:29:24 2012 @@ -60,6 +60,11 @@ <artifactId>jetty-servlet</artifactId> <version>${jetty-version}</version> </dependency> + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-jmx</artifactId> + <version>${jetty-version}</version> + </dependency> <!-- Unit test --> <dependency> <groupId>org.apache.camel</groupId> @@ -80,9 +85,10 @@ <groupId>com.ning</groupId> <artifactId>async-http-client</artifactId> <version>${ahc-version}</version> - <scope>test</scope> - </dependency> - <!-- logging --> + <!--<version>1.8.0-SNAPSHOT</version> --> + <scope>test</scope> + </dependency> + <!-- logging --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> Modified: camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java?rev=1342178&r1=1342177&r2=1342178&view=diff ============================================================================== --- camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java (original) +++ camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java Thu May 24 09:29:24 2012 @@ -16,38 +16,87 @@ */ package org.apache.camel.component.websocket; +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.net.InetSocketAddress; +import java.net.URI; +import java.security.GeneralSecurityException; import java.util.HashMap; import java.util.Map; import org.apache.camel.Endpoint; +import org.apache.camel.RuntimeCamelException; import org.apache.camel.impl.DefaultComponent; import org.apache.camel.util.ObjectHelper; +import org.apache.camel.util.URISupport; +import org.apache.camel.util.UnsafeUriCharactersEncoder; +import org.apache.camel.util.jsse.SSLContextParameters; +import org.eclipse.jetty.http.ssl.SslContextFactory; +import org.eclipse.jetty.jmx.MBeanContainer; +import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.SessionManager; +import org.eclipse.jetty.server.handler.ContextHandlerCollection; +import org.eclipse.jetty.server.nio.SelectChannelConnector; import org.eclipse.jetty.server.session.HashSessionManager; import org.eclipse.jetty.server.session.SessionHandler; +import org.eclipse.jetty.server.ssl.SslConnector; +import org.eclipse.jetty.server.ssl.SslSelectChannelConnector; import org.eclipse.jetty.servlet.DefaultServlet; import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletHolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.servlet.Servlet; + public class WebsocketComponent extends DefaultComponent { - private static final Logger LOG = LoggerFactory.getLogger(WebsocketComponent.class); + protected static final Logger LOG = LoggerFactory.getLogger(WebsocketComponent.class); + protected static final HashMap<String, ConnectorRef> CONNECTORS = new HashMap<String, ConnectorRef>(); - private ServletContextHandler context; - private Server server; - private String host = WebsocketConstants.DEFAULT_HOST; - private int port = WebsocketConstants.DEFAULT_PORT; - private String staticResources; + protected ServletContextHandler context; + protected SSLContextParameters sslContextParameters; + protected Server server; + protected MBeanContainer mbContainer; + + protected Integer port; + protected String host; + + protected boolean enableJmx; + + protected String staticResources; + protected String sslKeyPassword; + protected String sslPassword; + protected String sslKeystore; + + class ConnectorRef { + Server server; + Connector connector; + Servlet servlet; + int refCount; + + public ConnectorRef(Server server, Connector connector, Servlet servlet) { + this.server = server; + this.connector = connector; + this.servlet = servlet; + increment(); + } - /** - * Map for storing endpoints. Endpoint is identified by remaining part from endpoint URI. - * Eg. <tt>ws://foo?bar=123</tt> and <tt>ws://foo</tt> are referring to the same endpoint. - */ - private Map<String, WebsocketEndpoint> endpoints = new HashMap<String, WebsocketEndpoint>(); + public int increment() { + return ++refCount; + } + + public int decrement() { + return --refCount; + } + + public int getRefCount() { + return refCount; + } + } /** * Map for storing servlets. {@link WebsocketComponentServlet} is identified by pathSpec {@link String}. @@ -59,12 +108,31 @@ public class WebsocketComponent extends @Override protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { - WebsocketEndpoint endpoint = endpoints.get(remaining); - if (endpoint == null) { - endpoint = new WebsocketEndpoint(uri, this, remaining); - setProperties(endpoint, parameters); - endpoints.put(remaining, endpoint); + + Map<String, Object> websocketParameters = new HashMap<String, Object>(parameters); + + Boolean enableJmx = getAndRemoveParameter(parameters, "enableJmx", Boolean.class); + SSLContextParameters sslContextParameters = resolveAndRemoveReferenceParameter(parameters, "sslContextParametersRef", SSLContextParameters.class); + int port = extractPortNumber(remaining); + String host = extractHostName(remaining); + + WebsocketEndpoint endpoint = new WebsocketEndpoint(this, uri, remaining, parameters); + + if (enableJmx != null) { + endpoint.setEnableJmx(enableJmx); + } else { + endpoint.setEnableJmx(isEnableJmx()); } + + if (sslContextParameters == null) { + sslContextParameters = this.sslContextParameters; + } + + endpoint.setSslContextParameters(sslContextParameters); + endpoint.setPort(port); + endpoint.setHost(host); + + setProperties(endpoint, parameters); return endpoint; } @@ -97,46 +165,241 @@ public class WebsocketComponent extends this.host = host; } - public int getPort() { + public Integer getPort() { return port; } - public void setPort(int port) { + public void setPort(Integer port) { this.port = port; } - protected Server createServer(ServletContextHandler context, String host, int port, String home) { - InetSocketAddress address = new InetSocketAddress(host, port); - Server server = new Server(address); - - context.setContextPath("/"); - - SessionManager sm = new HashSessionManager(); - SessionHandler sh = new SessionHandler(sm); - context.setSessionHandler(sh); - - if (home != null) { - if (home.startsWith("classpath:")) { - home = ObjectHelper.after(home, "classpath:"); - LOG.debug("Using base resource from classpath: {}", home); - context.setBaseResource(new JettyClassPathResource(getCamelContext().getClassResolver(), home)); + /** + * Connects the URL specified on the endpoint to the specified processor. + */ + public void connect(WebsocketProducerConsumer prodcon) throws Exception { + + Server server = null; + DefaultServlet defaultServlet = null; + String baseResource = null; + WebsocketEndpoint endpoint = prodcon.getEndpoint(); + + String connectorKey = "websocket" + ":" + endpoint.getHost() + ":" + endpoint.getPort(); + + synchronized (CONNECTORS) { + ConnectorRef connectorRef = CONNECTORS.get(connectorKey); + if (connectorRef == null) { + Connector connector; + if (endpoint.getSslContextParameters() != null) { + connector = getSslSocketConnector(endpoint.getSslContextParameters()); + } else { + connector = new SelectChannelConnector(); + } + + if (port != null) { + connector.setPort(port); + } else { + connector.setPort(endpoint.getPort()); + } + + if (host != null) { + connector.setHost(host); + } else { + connector.setHost(endpoint.getHost()); + } + + connector.setHost(endpoint.getHost()); + + // Define Context and SessionManager + context.setContextPath("/"); + + SessionManager sm = new HashSessionManager(); + SessionHandler sh = new SessionHandler(sm); + context.setSessionHandler(sh); + + if (endpoint.getHome() != null) { + if (endpoint.getHome().startsWith("classpath:")) { + baseResource = ObjectHelper.after(endpoint.getHome(), "classpath:"); + LOG.debug("Using base resource from classpath: {}", baseResource); + context.setBaseResource(new JettyClassPathResource(getCamelContext().getClassResolver(), baseResource)); + } else { + LOG.debug("Using base resource: {}", baseResource); + context.setResourceBase(baseResource); + } + defaultServlet = new DefaultServlet(); + ServletHolder holder = new ServletHolder(defaultServlet); + + // avoid file locking on windows + // http://stackoverflow.com/questions/184312/how-to-make-jetty-dynamically-load-static-pages + holder.setInitParameter("useFileMappedBuffer", "false"); + context.addServlet(holder, "/"); + } + + // Create Server and add connector + server = new Server(); + server.addConnector(connector); + server.setHandler(context); + + connectorRef = new ConnectorRef(server, connector, defaultServlet); + CONNECTORS.put(connectorKey, connectorRef); + + server.start(); + } else { - LOG.debug("Using base resource: {}", home); - context.setResourceBase(home); + connectorRef.increment(); } - DefaultServlet defaultServlet = new DefaultServlet(); - ServletHolder holder = new ServletHolder(defaultServlet); - // avoid file locking on windows - // http://stackoverflow.com/questions/184312/how-to-make-jetty-dynamically-load-static-pages - holder.setInitParameter("useFileMappedBuffer", "false"); - context.addServlet(holder, "/"); } - server.setHandler(context); + } + + /** + * Disconnects the URL specified on the endpoint from the specified + * processor. + */ + public void disconnect(WebsocketProducerConsumer prodcon) throws Exception { + + WebsocketEndpoint endpoint = prodcon.getEndpoint(); + String connectorKey = "websocket" + ":" + endpoint.getHost() + ":" + endpoint.getPort(); + + synchronized (CONNECTORS) { + ConnectorRef connectorRef = CONNECTORS.get(connectorKey); + if (connectorRef != null) { + if (connectorRef.decrement() == 0) { + connectorRef.server.removeConnector(connectorRef.connector); + connectorRef.connector.stop(); + connectorRef.server.stop(); + CONNECTORS.remove(CONNECTORS); + } + } + } + + } + + /*protected Server createServer(ServletContextHandler context, String host, int port, String home) { + + String connectorKey = "websocket" + ":" + host + ":" + port; + Server server = null; + DefaultServlet defaultServlet = null; + // WebsocketComponent websocketComponent = (WebsocketComponent) this.getCamelContext().getEndpoint(connectorKey); + + synchronized (CONNECTORS) { + ConnectorRef connectorRef = CONNECTORS.get(connectorKey); + if (connectorRef == null) { + Connector connector; + if (sslContextParameters != null) { + connector = getSslSocketConnector(); + } else { + connector = new SelectChannelConnector(); + } + + connector.setHost(host); + connector.setPort(port); + + // Define Context and SessionManager + context.setContextPath("/"); + + SessionManager sm = new HashSessionManager(); + SessionHandler sh = new SessionHandler(sm); + context.setSessionHandler(sh); + + if (home != null) { + if (home.startsWith("classpath:")) { + home = ObjectHelper.after(home, "classpath:"); + LOG.debug("Using base resource from classpath: {}", home); + context.setBaseResource(new JettyClassPathResource(getCamelContext().getClassResolver(), home)); + } else { + LOG.debug("Using base resource: {}", home); + context.setResourceBase(home); + } + defaultServlet = new DefaultServlet(); + ServletHolder holder = new ServletHolder(defaultServlet); + + // avoid file locking on windows + // http://stackoverflow.com/questions/184312/how-to-make-jetty-dynamically-load-static-pages + holder.setInitParameter("useFileMappedBuffer", "false"); + context.addServlet(holder, "/"); + } + + // Create Server and add connector + server = new Server(); + server.addConnector(connector); + server.setHandler(context); + connectorRef = new ConnectorRef(server, connector, defaultServlet); + + CONNECTORS.put(connectorKey, connectorRef); + + } else { + connectorRef.increment(); + } + + } return server; } +*/ + protected SslConnector getSslSocketConnector(SSLContextParameters sslContextParameters) { + SslSelectChannelConnector sslSocketConnector = null; + if (sslContextParameters != null) { + SslContextFactory sslContextFactory = new WebSocketComponentSslContextFactory(); + try { + sslContextFactory.setSslContext(sslContextParameters.createSSLContext()); + } catch (Exception e) { + throw new RuntimeCamelException("Error initiating SSLContext.", e); + } + sslSocketConnector = new SslSelectChannelConnector(sslContextFactory); + } else { + + sslSocketConnector = new SslSelectChannelConnector(); + // with default null values, jetty ssl system properties + // and console will be read by jetty implementation + sslSocketConnector.getSslContextFactory().setKeyManagerPassword(sslPassword); + sslSocketConnector.getSslContextFactory().setKeyStorePassword(sslKeyPassword); + if (sslKeystore != null) { + sslSocketConnector.getSslContextFactory().setKeyStorePath(sslKeystore); + } + + } + + return sslSocketConnector; + } + + + /** + * Override the key/trust store check method as it does not account for a factory that has + * a pre-configured {@link javax.net.ssl.SSLContext}. + */ + private static final class WebSocketComponentSslContextFactory extends SslContextFactory { + // This method is for Jetty 7.0.x ~ 7.4.x + @SuppressWarnings("unused") + public boolean checkConfig() { + if (getSslContext() == null) { + return checkSSLContextFactoryConfig(this); + } else { + return true; + } + } + + // This method is for Jetty 7.5.x + public void checkKeyStore() { + // here we don't check the SslContext as it is already created + } + } + + private static boolean checkSSLContextFactoryConfig(Object instance) { + try { + Method method = instance.getClass().getMethod("checkConfig"); + return (Boolean) method.invoke(instance); + } catch (NoSuchMethodException ex) { + // ignore + } catch (IllegalArgumentException e) { + // ignore + } catch (IllegalAccessException e) { + // ignore + } catch (InvocationTargetException e) { + // ignore + } + return false; + } public WebsocketComponentServlet addServlet(NodeSynchronization sync, WebsocketConsumer consumer, String remaining) { String pathSpec = createPathSpec(remaining); @@ -162,26 +425,99 @@ public class WebsocketComponent extends } private static String createPathSpec(String remaining) { - return String.format("/%s/*", remaining); + // Is not correct as it does not support to add port in the URI + //return String.format("/%s/*", remaining); + + int index = remaining.indexOf("/"); + if (index != -1) { + return remaining.substring(index, remaining.length()); + } else { + return "/" + remaining; + } + } + + private static int extractPortNumber(String remaining) { + int index1 = remaining.indexOf(":"); + int index2 = remaining.indexOf("/"); + + if ((index1 != -1) && (index2 != -1)) { + String result = remaining.substring(index1 + 1, index2); + return Integer.parseInt(result); + } else { + return 9292; + } + + } + + private static String extractHostName(String remaining) { + int index = remaining.indexOf(":"); + if (index != -1) { + return remaining.substring(0, index); + } else { + return null; + } + + } + + + public String getSslKeyPassword() { + return sslKeyPassword; + } + + public String getSslPassword() { + return sslPassword; + } + + public String getSslKeystore() { + return sslKeystore; + } + + public void setSslKeyPassword(String sslKeyPassword) { + this.sslKeyPassword = sslKeyPassword; + } + + public void setSslPassword(String sslPassword) { + this.sslPassword = sslPassword; } + public void setSslKeystore(String sslKeystore) { + this.sslKeystore = sslKeystore; + } + + public void setEnableJmx(boolean enableJmx) { + this.enableJmx = enableJmx; + } + + public boolean isEnableJmx() { + return enableJmx; + } + + public SSLContextParameters getSslContextParameters() { + return sslContextParameters; + } + + public void setSslContextParameters(SSLContextParameters sslContextParameters) { + this.sslContextParameters = sslContextParameters; + } + + @Override protected void doStart() throws Exception { super.doStart(); - LOG.info("Starting server {}:{}; static resources: {}", new Object[]{host, port, staticResources}); context = createContext(); - server = createServer(context, host, port, staticResources); - server.start(); + //LOG.info("Starting server {}:{}; static resources: {}", new Object[]{host, port, staticResources}); + //server = createServer(context, host, port, staticResources); } @Override public void doStop() throws Exception { - if (server != null) { - LOG.info("Stopping server {}:{}", host, port); - server.stop(); - server = null; + super.doStop(); + for (ConnectorRef connectorRef : CONNECTORS.values()) { + connectorRef.server.removeConnector(connectorRef.connector); + connectorRef.connector.stop(); + connectorRef.server.stop(); } - endpoints.clear(); + CONNECTORS.clear(); } } Modified: camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketConstants.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketConstants.java?rev=1342178&r1=1342177&r2=1342178&view=diff ============================================================================== --- camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketConstants.java (original) +++ camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketConstants.java Thu May 24 09:29:24 2012 @@ -24,6 +24,9 @@ public final class WebsocketConstants { public static final String CONNECTION_KEY = "websocket.connectionKey"; public static final String SEND_TO_ALL = "websocket.sendToAll"; + public static final String WS_PROTOCOL ="ws"; + public static final String WSS_PROTOCOL ="wss"; + private WebsocketConstants() { }; Modified: camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketConsumer.java?rev=1342178&r1=1342177&r2=1342178&view=diff ============================================================================== --- camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketConsumer.java (original) +++ camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketConsumer.java Thu May 24 09:29:24 2012 @@ -22,10 +22,29 @@ import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.impl.DefaultConsumer; -public class WebsocketConsumer extends DefaultConsumer { +public class WebsocketConsumer extends DefaultConsumer implements WebsocketProducerConsumer { - public WebsocketConsumer(Endpoint endpoint, Processor processor) { + private final WebsocketEndpoint endpoint; + + public WebsocketConsumer(WebsocketEndpoint endpoint, Processor processor) { super(endpoint, processor); + this.endpoint = endpoint; + } + + @Override + public void start() throws Exception { + super.start(); + endpoint.connect(this); + } + + @Override + public void stop() throws Exception { + endpoint.disconnect(this); + super.stop(); + } + + public WebsocketEndpoint getEndpoint() { + return endpoint; } public void sendMessage(final String connectionKey, final String message) { Modified: camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java?rev=1342178&r1=1342177&r2=1342178&view=diff ============================================================================== --- camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java (original) +++ camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java Thu May 24 09:29:24 2012 @@ -20,30 +20,54 @@ import org.apache.camel.Consumer; import org.apache.camel.Processor; import org.apache.camel.Producer; import org.apache.camel.impl.DefaultEndpoint; +import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.ServiceHelper; +import org.apache.camel.util.jsse.SSLContextParameters; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Map; public class WebsocketEndpoint extends DefaultEndpoint { private NodeSynchronization sync; - private String remaining; private WebsocketStore memoryStore; + private WebsocketComponent component; + private SSLContextParameters sslContextParameters; + private URI uri; + private Boolean sendToAll; + private boolean enableJmx; + + private String remaining; + private String host; + // Base Resource for the ServletContextHandler + private String home; - public WebsocketEndpoint(String uri, WebsocketComponent component, String remaining) { + private Integer port; + + public WebsocketEndpoint(WebsocketComponent component, String uri, String remaining, Map<String, Object> parameters) { super(uri, component); this.remaining = remaining; - this.memoryStore = new MemoryWebsocketStore(); this.sync = new DefaultNodeSynchronization(memoryStore); + this.component = component; + try { + this.uri = new URI(uri); + } catch (URISyntaxException e) { + throw new IllegalArgumentException(e); + } } @Override public WebsocketComponent getComponent() { + ObjectHelper.notNull(component, "component"); return (WebsocketComponent) super.getComponent(); } @Override public Consumer createConsumer(Processor processor) throws Exception { + ObjectHelper.notNull(component, "component"); WebsocketConsumer consumer = new WebsocketConsumer(this, processor); getComponent().addServlet(sync, consumer, remaining); return consumer; @@ -55,11 +79,47 @@ public class WebsocketEndpoint extends D return new WebsocketProducer(this, memoryStore); } + public void connect(WebsocketProducerConsumer prodcons) throws Exception { + component.connect(prodcons); + } + + public void disconnect(WebsocketProducerConsumer prodcons) throws Exception { + component.disconnect(prodcons); + } + @Override public boolean isSingleton() { return true; } + public URI getUri() { + return uri; + } + + public Integer getPort() { + return port; + } + + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + public void setPort(int port) { + this.port = port; + } + + public String getHome() { + return home; + } + + public void setHome(String home) { + this.home = home; + } + public Boolean getSendToAll() { return sendToAll; } @@ -68,6 +128,23 @@ public class WebsocketEndpoint extends D this.sendToAll = sendToAll; } + public SSLContextParameters getSslContextParameters() { + return sslContextParameters; + } + + public void setSslContextParameters(SSLContextParameters sslContextParameters) { + this.sslContextParameters = sslContextParameters; + } + + public boolean isEnableJmx() { + return this.enableJmx; + } + + public void setEnableJmx(boolean enableJmx) { + this.enableJmx = enableJmx; + } + + @Override protected void doStart() throws Exception { ServiceHelper.startService(memoryStore); Modified: camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java?rev=1342178&r1=1342177&r2=1342178&view=diff ============================================================================== --- camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java (original) +++ camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java Thu May 24 09:29:24 2012 @@ -24,14 +24,17 @@ import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.impl.DefaultProducer; -public class WebsocketProducer extends DefaultProducer { +public class WebsocketProducer extends DefaultProducer implements WebsocketProducerConsumer { + private final WebsocketStore store; private final Boolean sendToAll; + private final WebsocketEndpoint endpoint; public WebsocketProducer(WebsocketEndpoint endpoint, WebsocketStore store) { super(endpoint); this.store = store; this.sendToAll = endpoint.getSendToAll(); + this.endpoint = endpoint; } @Override @@ -54,6 +57,10 @@ public class WebsocketProducer extends D } } + public WebsocketEndpoint getEndpoint() { + return endpoint; + } + boolean isSendToAllSet(Message in) { // header may be null; have to be careful here (and fallback to use sendToAll option configured from endpoint) Boolean value = in.getHeader(WebsocketConstants.SEND_TO_ALL, sendToAll, Boolean.class); Added: camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducerConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducerConsumer.java?rev=1342178&view=auto ============================================================================== --- camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducerConsumer.java (added) +++ camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducerConsumer.java Thu May 24 09:29:24 2012 @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.websocket; + +public interface WebsocketProducerConsumer { + + /** + * Gets the endpoint + */ + WebsocketEndpoint getEndpoint(); +} Modified: camel/trunk/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketComponentTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketComponentTest.java?rev=1342178&r1=1342177&r2=1342178&view=diff ============================================================================== --- camel/trunk/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketComponentTest.java (original) +++ camel/trunk/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketComponentTest.java Thu May 24 09:29:24 2012 @@ -78,8 +78,11 @@ public class WebsocketComponentTest { assertNotNull(handler); } + // TODO - Update tests to use endpoint instead of createServer - chm - 22/05/2012 + /* + @Test - public void testCreateServerWithoutStaticContent() { + public void testCreateServerWithoutStaticContent() throws Exception { ServletContextHandler handler = component.createContext(); Server server = component.createServer(handler, "localhost", 1988, null); assertEquals(1, server.getConnectors().length); @@ -95,11 +98,12 @@ public class WebsocketComponentTest { assertNull(handler.getServletHandler().getHolderEntry("/")); } + @Test - public void testCreateServerWithStaticContent() { + public void testCreateServerWithStaticContent() throws Exception { ServletContextHandler handler = component.createContext(); Server server = component.createServer(handler, "localhost", 1988, "public/"); - assertEquals(1, server.getConnectors().length); + assertEquals(2, server.getConnectors().length); assertEquals("localhost", server.getConnectors()[0].getHost()); assertEquals(1988, server.getConnectors()[0].getPort()); assertFalse(server.getConnectors()[0].isStarted()); @@ -112,6 +116,7 @@ public class WebsocketComponentTest { assertTrue(handler.getResourceBase().endsWith("public")); assertNotNull(handler.getServletHandler().getHolderEntry("/")); } + */ @Test public void testCreateEndpoint() throws Exception { Modified: camel/trunk/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketConsumerTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketConsumerTest.java?rev=1342178&r1=1342177&r2=1342178&view=diff ============================================================================== --- camel/trunk/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketConsumerTest.java (original) +++ camel/trunk/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketConsumerTest.java Thu May 24 09:29:24 2012 @@ -45,7 +45,7 @@ public class WebsocketConsumerTest { private static final String MESSAGE = "message"; @Mock - private Endpoint endpoint; + private WebsocketEndpoint endpoint; @Mock private ExceptionHandler exceptionHandler; @Mock Modified: camel/trunk/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketEndpointTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketEndpointTest.java?rev=1342178&r1=1342177&r2=1342178&view=diff ============================================================================== --- camel/trunk/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketEndpointTest.java (original) +++ camel/trunk/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketEndpointTest.java Thu May 24 09:29:24 2012 @@ -57,7 +57,7 @@ public class WebsocketEndpointTest { */ @Before public void setUp() throws Exception { - websocketEndpoint = new WebsocketEndpoint(URI, component, REMAINING); + websocketEndpoint = new WebsocketEndpoint(component, URI, REMAINING, null); } /** Added: camel/trunk/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketSSLContextInUriRouteExampleTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketSSLContextInUriRouteExampleTest.java?rev=1342178&view=auto ============================================================================== --- camel/trunk/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketSSLContextInUriRouteExampleTest.java (added) +++ camel/trunk/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketSSLContextInUriRouteExampleTest.java Thu May 24 09:29:24 2012 @@ -0,0 +1,192 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.websocket; + +import com.ning.http.client.AsyncHttpClient; +import com.ning.http.client.AsyncHttpClientConfig; +import com.ning.http.client.websocket.WebSocket; +import com.ning.http.client.websocket.WebSocketTextListener; +import com.ning.http.client.websocket.WebSocketUpgradeHandler; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.impl.JndiRegistry; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.apache.camel.util.jsse.KeyManagersParameters; +import org.apache.camel.util.jsse.KeyStoreParameters; +import org.apache.camel.util.jsse.SSLContextParameters; +import org.apache.camel.util.jsse.TrustManagersParameters; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.net.URL; +import java.security.GeneralSecurityException; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +public class WebsocketSSLContextInUriRouteExampleTest extends CamelTestSupport { + + private static final String NULL_VALUE_MARKER = CamelTestSupport.class.getCanonicalName(); + private static List<String> received = new ArrayList<String>(); + private static CountDownLatch latch = new CountDownLatch(10); + private Properties originalValues = new Properties(); + private String pwd = "changeit"; + private String uriConsumer; + private String uriProducer; + private String server = "127.0.0.1"; + private int port = 8443; + + @Override + @Before + public void setUp() throws Exception { + + URL trustStoreUrl = this.getClass().getClassLoader().getResource("jsse/localhost.ks"); + setSystemProp("javax.net.ssl.trustStore", trustStoreUrl.toURI().getPath()); + uriConsumer = "websocket://" + server + ":" + port + "/test?sslContextParametersRef=#sslContextParameters"; + uriProducer = "websocket://" + server + ":" + port + "/test"; + + super.setUp(); + } + + @Override + protected JndiRegistry createRegistry() throws Exception { + KeyStoreParameters ksp = new KeyStoreParameters(); + ksp.setResource("jsse/localhost.ks"); + ksp.setPassword("changeit"); + + KeyManagersParameters kmp = new KeyManagersParameters(); + kmp.setKeyPassword("changeit"); + kmp.setKeyStore(ksp); + + TrustManagersParameters tmp = new TrustManagersParameters(); + tmp.setKeyStore(ksp); + + SSLContextParameters sslContextParameters = new SSLContextParameters(); + sslContextParameters.setKeyManagers(kmp); + sslContextParameters.setTrustManagers(tmp); + + JndiRegistry registry = super.createRegistry(); + registry.bind("sslContextParameters", sslContextParameters); + return registry; + } + + protected void setSystemProp(String key, String value) { + String originalValue = System.setProperty(key, value); + originalValues.put(key, originalValue != null ? originalValue : NULL_VALUE_MARKER); + } + + protected AsyncHttpClient createAsyncHttpSSLClient() throws IOException, GeneralSecurityException { + + AsyncHttpClient c; + AsyncHttpClientConfig config; + + AsyncHttpClientConfig.Builder builder = + new AsyncHttpClientConfig.Builder(); + + builder.setSSLContext(new SSLContextParameters().createSSLContext()); + config = builder.build(); + c = new AsyncHttpClient(config); + + return c; + } + + protected SSLContextParameters defineSSLContextParameters() { + + KeyStoreParameters ksp = new KeyStoreParameters(); + // ksp.setResource(this.getClass().getClassLoader().getResource("jsse/localhost.ks").toString()); + ksp.setResource("jsse/localhost.ks"); + ksp.setPassword(pwd); + + KeyManagersParameters kmp = new KeyManagersParameters(); + kmp.setKeyPassword(pwd); + kmp.setKeyStore(ksp); + + TrustManagersParameters tmp = new TrustManagersParameters(); + tmp.setKeyStore(ksp); + + SSLContextParameters sslContextParameters = new SSLContextParameters(); + sslContextParameters.setKeyManagers(kmp); + sslContextParameters.setTrustManagers(tmp); + + return sslContextParameters; + } + + @Test + public void testWSHttpCall() throws Exception { + + AsyncHttpClient c = createAsyncHttpSSLClient(); + WebSocket websocket = c.prepareGet("wss://127.0.0.1:8443/test").execute( + new WebSocketUpgradeHandler.Builder() + .addWebSocketListener(new WebSocketTextListener() { + @Override + public void onMessage(String message) { + received.add(message); + log.info("received --> " + message); + latch.countDown(); + } + + @Override + public void onFragment(String fragment, boolean last) { + } + + @Override + public void onOpen(WebSocket websocket) { + } + + @Override + public void onClose(WebSocket websocket) { + } + + @Override + public void onError(Throwable t) { + t.printStackTrace(); + } + }).build()).get(); + + getMockEndpoint("mock:client").expectedBodiesReceived("Hello from WS client"); + + websocket.sendTextMessage("Hello from WS client"); + assertTrue(latch.await(10, TimeUnit.SECONDS)); + + assertMockEndpointsSatisfied(); + + assertEquals(10, received.size()); + for (int i = 0; i < 10; i++) { + assertEquals(">> Welcome on board!", received.get(i)); + } + + websocket.close(); + c.close(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() { + + from(uriConsumer) + .log(">>> Message received from WebSocket Client : ${body}") + .to("mock:client") + .loop(10) + .setBody().constant(">> Welcome on board!") + .to(uriConsumer); + } + }; + } +} Added: camel/trunk/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketSSLRouteExampleTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketSSLRouteExampleTest.java?rev=1342178&view=auto ============================================================================== --- camel/trunk/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketSSLRouteExampleTest.java (added) +++ camel/trunk/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketSSLRouteExampleTest.java Thu May 24 09:29:24 2012 @@ -0,0 +1,166 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.websocket; + +import com.ning.http.client.AsyncHttpClient; +import com.ning.http.client.AsyncHttpClientConfig; +import com.ning.http.client.websocket.WebSocket; +import com.ning.http.client.websocket.WebSocketTextListener; +import com.ning.http.client.websocket.WebSocketUpgradeHandler; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.apache.camel.util.jsse.KeyManagersParameters; +import org.apache.camel.util.jsse.KeyStoreParameters; +import org.apache.camel.util.jsse.SSLContextParameters; +import org.apache.camel.util.jsse.TrustManagersParameters; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.net.URL; +import java.security.GeneralSecurityException; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +public class WebsocketSSLRouteExampleTest extends CamelTestSupport { + + private static final String NULL_VALUE_MARKER = CamelTestSupport.class.getCanonicalName(); + private static List<String> received = new ArrayList<String>(); + private static CountDownLatch latch = new CountDownLatch(10); + protected Properties originalValues = new Properties(); + protected String pwd = "changeit"; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + + URL trustStoreUrl = this.getClass().getClassLoader().getResource("jsse/localhost.ks"); + setSystemProp("javax.net.ssl.trustStore", trustStoreUrl.toURI().getPath()); + } + + protected void setSystemProp(String key, String value) { + String originalValue = System.setProperty(key, value); + originalValues.put(key, originalValue != null ? originalValue : NULL_VALUE_MARKER); + } + + protected AsyncHttpClient createAsyncHttpSSLClient() throws IOException, GeneralSecurityException { + + AsyncHttpClient c; + AsyncHttpClientConfig config; + + AsyncHttpClientConfig.Builder builder = + new AsyncHttpClientConfig.Builder(); + + builder.setSSLContext(new SSLContextParameters().createSSLContext()); + config = builder.build(); + c = new AsyncHttpClient(config); + + return c; + } + + protected SSLContextParameters defineSSLContextParameters() { + + KeyStoreParameters ksp = new KeyStoreParameters(); + // ksp.setResource(this.getClass().getClassLoader().getResource("jsse/localhost.ks").toString()); + ksp.setResource("jsse/localhost.ks"); + ksp.setPassword(pwd); + + KeyManagersParameters kmp = new KeyManagersParameters(); + kmp.setKeyPassword(pwd); + kmp.setKeyStore(ksp); + + TrustManagersParameters tmp = new TrustManagersParameters(); + tmp.setKeyStore(ksp); + + SSLContextParameters sslContextParameters = new SSLContextParameters(); + sslContextParameters.setKeyManagers(kmp); + sslContextParameters.setTrustManagers(tmp); + + return sslContextParameters; + } + + @Test + public void testWSHttpCall() throws Exception { + + AsyncHttpClient c = createAsyncHttpSSLClient(); + WebSocket websocket = c.prepareGet("wss://127.0.0.1:8443/test").execute( + new WebSocketUpgradeHandler.Builder() + .addWebSocketListener(new WebSocketTextListener() { + @Override + public void onMessage(String message) { + received.add(message); + log.info("received --> " + message); + latch.countDown(); + } + + @Override + public void onFragment(String fragment, boolean last) { + } + + @Override + public void onOpen(WebSocket websocket) { + } + + @Override + public void onClose(WebSocket websocket) { + } + + @Override + public void onError(Throwable t) { + t.printStackTrace(); + } + }).build()).get(); + + getMockEndpoint("mock:client").expectedBodiesReceived("Hello from WS client"); + + websocket.sendTextMessage("Hello from WS client"); + assertTrue(latch.await(10, TimeUnit.SECONDS)); + + assertMockEndpointsSatisfied(); + + assertEquals(10, received.size()); + for (int i = 0; i < 10; i++) { + assertEquals(">> Welcome on board!", received.get(i)); + } + + websocket.close(); + c.close(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() { + + WebsocketComponent websocketComponent = (WebsocketComponent) context.getComponent("websocket"); + websocketComponent.setSslContextParameters(defineSSLContextParameters()); + websocketComponent.setPort(8443); + + from("websocket://test") + .log(">>> Message received from WebSocket Client : ${body}") + .to("mock:client") + .loop(10) + .setBody().constant(">> Welcome on board!") + .to("websocket://test"); + } + }; + } +} Added: camel/trunk/components/camel-websocket/src/test/resources/jsse/localhost.ks URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-websocket/src/test/resources/jsse/localhost.ks?rev=1342178&view=auto ============================================================================== Binary file - no diff available. Propchange: camel/trunk/components/camel-websocket/src/test/resources/jsse/localhost.ks ------------------------------------------------------------------------------ svn:mime-type = application/octet-stream Modified: camel/trunk/components/camel-websocket/src/test/resources/log4j.properties URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-websocket/src/test/resources/log4j.properties?rev=1342178&r1=1342177&r2=1342178&view=diff ============================================================================== --- camel/trunk/components/camel-websocket/src/test/resources/log4j.properties (original) +++ camel/trunk/components/camel-websocket/src/test/resources/log4j.properties Thu May 24 09:29:24 2012 @@ -21,9 +21,10 @@ log4j.rootLogger=INFO, file # uncomment the following line to turn on Camel debugging -#log4j.logger.org.apache.camel.component.websocket=DEBUG -#log4j.logger.org.eclipse.jetty.websocket=DEBUG -#log4j.logger.com.ning.http.client=DEBUG +# log4j.logger.org.apache.camel.component.websocket=DEBUG +# log4j.logger.org.eclipse.jetty.websocket=DEBUG +# log4j.logger.org.eclipse.jetty=DEBUG +# log4j.logger.com.ning.http.client=DEBUG # CONSOLE appender not used by default