Repository: camel Updated Branches: refs/heads/master 7b67a04ed -> 6208c5afb
CAMEL-7316: refactor camel-ws using the refactored camel-ahc Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/6208c5af Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/6208c5af Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/6208c5af Branch: refs/heads/master Commit: 6208c5afb2809987a84dafe654314783fac81250 Parents: 7b67a04 Author: Akitoshi Yoshida <a...@apache.org> Authored: Tue Mar 25 15:20:00 2014 +0100 Committer: Akitoshi Yoshida <a...@apache.org> Committed: Tue Mar 25 15:20:00 2014 +0100 ---------------------------------------------------------------------- components/camel-ws/pom.xml | 16 +-- .../apache/camel/component/ws/WsComponent.java | 128 +++--------------- .../apache/camel/component/ws/WsEndpoint.java | 130 ++++--------------- 3 files changed, 42 insertions(+), 232 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/6208c5af/components/camel-ws/pom.xml ---------------------------------------------------------------------- diff --git a/components/camel-ws/pom.xml b/components/camel-ws/pom.xml index 469977f..a5b7b41 100644 --- a/components/camel-ws/pom.xml +++ b/components/camel-ws/pom.xml @@ -41,20 +41,8 @@ <artifactId>camel-core</artifactId> </dependency> <dependency> - <groupId>com.ning</groupId> - <artifactId>async-http-client</artifactId> - <version>${ahc-version}</version> - <exclusions> - <exclusion> - <groupId>io.netty</groupId> - <artifactId>netty</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>io.netty</groupId> - <artifactId>netty</artifactId> - <version>${netty3-version}</version> + <groupId>org.apache.camel</groupId> + <artifactId>camel-ahc</artifactId> </dependency> <dependency> <groupId>org.glassfish.grizzly</groupId> http://git-wip-us.apache.org/repos/asf/camel/blob/6208c5af/components/camel-ws/src/main/java/org/apache/camel/component/ws/WsComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-ws/src/main/java/org/apache/camel/component/ws/WsComponent.java b/components/camel-ws/src/main/java/org/apache/camel/component/ws/WsComponent.java index a826aa3..e6832d7 100644 --- a/components/camel-ws/src/main/java/org/apache/camel/component/ws/WsComponent.java +++ b/components/camel-ws/src/main/java/org/apache/camel/component/ws/WsComponent.java @@ -17,127 +17,29 @@ package org.apache.camel.component.ws; import java.net.URI; -import java.util.Map; -import com.ning.http.client.AsyncHttpClientConfig; -import com.ning.http.client.Realm; - -import org.apache.camel.Endpoint; -import org.apache.camel.impl.DefaultComponent; -import org.apache.camel.util.IntrospectionSupport; -import org.apache.camel.util.URISupport; -import org.apache.camel.util.UnsafeUriCharactersEncoder; -import org.apache.camel.util.jsse.SSLContextParameters; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.camel.component.ahc.AhcComponent; +import org.apache.camel.component.ahc.AhcEndpoint; /** - * Defines the <a href="http://camel.apache.org/ahc.html">Async HTTP Client Component</a> + * Defines the <a href="http://camel.apache.org/ws.html">WebSocket Client Component</a> */ -public class WsComponent extends DefaultComponent { - - private static final Logger LOG = LoggerFactory.getLogger(WsComponent.class); +public class WsComponent extends AhcComponent { - private static final String CLIENT_CONFIG_PREFIX = "clientConfig."; - private static final String CLIENT_REALM_CONFIG_PREFIX = "clientConfig.realm."; - - private AsyncHttpClientConfig clientConfig; - private SSLContextParameters sslContextParameters; - + /* (non-Javadoc) + * @see org.apache.camel.component.ahc.AhcComponent#createAddressUri(java.lang.String, java.lang.String) + */ @Override - protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { - String addressUri = uri; - - // Do not set the URI because we still have all of the Camel internal - // parameters in the URI at this point. - WsEndpoint endpoint = new WsEndpoint(uri, this); - endpoint.setClientConfig(getClientConfig()); - endpoint.setSslContextParameters(getSslContextParameters()); - - setProperties(endpoint, parameters); - - if (IntrospectionSupport.hasProperties(parameters, CLIENT_CONFIG_PREFIX)) { - AsyncHttpClientConfig.Builder builder = endpoint.getClientConfig() == null - ? new AsyncHttpClientConfig.Builder() : WsComponent.cloneConfig(endpoint.getClientConfig()); - - if (endpoint.getClientConfig() != null) { - LOG.warn("The user explicitly set an AsyncHttpClientConfig instance on the component or " - + "endpoint, but this endpoint URI contains client configuration parameters. " - + "Are you sure that this is what was intended? The URI parameters will be applied" - + " to a clone of the supplied AsyncHttpClientConfig in order to prevent unintended modification" - + " of the explicitly configured AsyncHttpClientConfig. That is, the URI parameters override the" - + " settings on the explicitly configured AsyncHttpClientConfig for this endpoint."); - } - - // special for realm builder - Realm.RealmBuilder realmBuilder = null; - if (IntrospectionSupport.hasProperties(parameters, CLIENT_REALM_CONFIG_PREFIX)) { - realmBuilder = new Realm.RealmBuilder(); - - // set and validate additional parameters on client config - Map<String, Object> realmParams = IntrospectionSupport.extractProperties(parameters, CLIENT_REALM_CONFIG_PREFIX); - setProperties(realmBuilder, realmParams); - validateParameters(uri, realmParams, null); - } - - // set and validate additional parameters on client config - Map<String, Object> clientParams = IntrospectionSupport.extractProperties(parameters, CLIENT_CONFIG_PREFIX); - setProperties(builder, clientParams); - validateParameters(uri, clientParams, null); - - if (realmBuilder != null) { - builder.setRealm(realmBuilder.build()); - } - endpoint.setClientConfig(builder.build()); - } - - SSLContextParameters sslparams = resolveAndRemoveReferenceParameter(parameters, "sslContextParameters", SSLContextParameters.class); - - // prefer to use endpoint configured over component configured - if (sslparams == null) { - // fallback to component configured - sslparams = getSslContextParameters(); - } - if (sslparams != null) { - endpoint.setSslContextParameters(sslparams); - } - - // restructure uri to be based on the parameters left as we dont want to include the Camel internal options - addressUri = UnsafeUriCharactersEncoder.encode(addressUri); - URI wsuri = URISupport.createRemainingURI(new URI(addressUri), parameters); - endpoint.setWsUri(wsuri); - - return endpoint; + protected String createAddressUri(String uri, String remaining) { + return uri; } - public AsyncHttpClientConfig getClientConfig() { - return clientConfig; - } - - public void setClientConfig(AsyncHttpClientConfig clientConfig) { - this.clientConfig = clientConfig; - } - - public SSLContextParameters getSslContextParameters() { - return sslContextParameters; - } - - public void setSslContextParameters(SSLContextParameters sslContextParameters) { - this.sslContextParameters = sslContextParameters; - } - - /** - * Creates a new client configuration builder using {@code clientConfig} as a template for - * the builder. - * - * @param clientConfig the instance to serve as a template for the builder - * - * @return a builder configured with the same options as the supplied config + /* (non-Javadoc) + * @see org.apache.camel.component.ahc.AhcComponent#createAhcEndpoint(java.lang.String, org.apache.camel.component.ahc.AhcComponent, java.net.URI) */ - static AsyncHttpClientConfig.Builder cloneConfig(AsyncHttpClientConfig clientConfig) { - - AsyncHttpClientConfig.Builder builder = new AsyncHttpClientConfig.Builder(clientConfig); - - return builder; + @Override + protected AhcEndpoint createAhcEndpoint(String endpointUri, AhcComponent component, URI httpUri) { + return new WsEndpoint(endpointUri, (WsComponent)component); } + } http://git-wip-us.apache.org/repos/asf/camel/blob/6208c5af/components/camel-ws/src/main/java/org/apache/camel/component/ws/WsEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-ws/src/main/java/org/apache/camel/component/ws/WsEndpoint.java b/components/camel-ws/src/main/java/org/apache/camel/component/ws/WsEndpoint.java index 67ebe2f..a4988d4 100644 --- a/components/camel-ws/src/main/java/org/apache/camel/component/ws/WsEndpoint.java +++ b/components/camel-ws/src/main/java/org/apache/camel/component/ws/WsEndpoint.java @@ -20,13 +20,10 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.CharArrayReader; import java.io.IOException; -import java.net.URI; import java.util.HashSet; import java.util.Set; import java.util.concurrent.ExecutionException; -import javax.net.ssl.SSLContext; - import com.ning.http.client.AsyncHttpClient; import com.ning.http.client.AsyncHttpClientConfig; import com.ning.http.client.AsyncHttpProvider; @@ -39,28 +36,22 @@ import com.ning.http.client.websocket.WebSocketUpgradeHandler; import org.apache.camel.Consumer; import org.apache.camel.Processor; import org.apache.camel.Producer; -import org.apache.camel.impl.DefaultEndpoint; -import org.apache.camel.util.jsse.SSLContextParameters; +import org.apache.camel.component.ahc.AhcEndpoint; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * */ -public class WsEndpoint extends DefaultEndpoint { +public class WsEndpoint extends AhcEndpoint { private static final transient Logger LOG = LoggerFactory.getLogger(WsEndpoint.class); + // for using websocket streaming/fragments private static final boolean GRIZZLY_AVAILABLE = probeClass("com.ning.http.client.providers.grizzly.GrizzlyAsyncHttpProvider"); - private AsyncHttpClient client; - private AsyncHttpClientConfig clientConfig; private WebSocket websocket; private Set<WsConsumer> consumers; - private URI wsUri; - private boolean throwExceptionOnFailure = true; - private boolean transferException; - private SSLContextParameters sslContextParameters; private boolean useStreaming; private static boolean probeClass(String name) { @@ -73,7 +64,7 @@ public class WsEndpoint extends DefaultEndpoint { } public WsEndpoint(String endpointUri, WsComponent component) { - super(endpointUri, component); + super(endpointUri, component, null); this.consumers = new HashSet<WsConsumer>(); } @@ -92,20 +83,13 @@ public class WsEndpoint extends DefaultEndpoint { return new WsConsumer(this, processor); } - - @Override - public boolean isSingleton() { - return true; - } - WebSocket getWebSocket() { synchronized (this) { if (websocket == null) { try { connect(); } catch (Exception e) { - // TODO add the throw exception in the method - e.printStackTrace(); + LOG.error("Failed to connect", e); } } } @@ -116,46 +100,6 @@ public class WsEndpoint extends DefaultEndpoint { this.websocket = websocket; } - public AsyncHttpClientConfig getClientConfig() { - return clientConfig; - } - - public void setClientConfig(AsyncHttpClientConfig clientConfig) { - this.clientConfig = clientConfig; - } - - public boolean isThrowExceptionOnFailure() { - return throwExceptionOnFailure; - } - - public void setThrowExceptionOnFailure(boolean throwExceptionOnFailure) { - this.throwExceptionOnFailure = throwExceptionOnFailure; - } - - public boolean isTransferException() { - return transferException; - } - - public void setTransferException(boolean transferException) { - this.transferException = transferException; - } - - public SSLContextParameters getSslContextParameters() { - return sslContextParameters; - } - - public void setSslContextParameters(SSLContextParameters sslContextParameters) { - this.sslContextParameters = sslContextParameters; - } - - public URI getWsUri() { - return wsUri; - } - - public void setWsUri(URI wsUri) { - this.wsUri = wsUri; - } - /** * @return the useStreaming */ @@ -170,60 +114,36 @@ public class WsEndpoint extends DefaultEndpoint { this.useStreaming = useStreaming; } + /* (non-Javadoc) + * @see org.apache.camel.component.ahc.AhcEndpoint#createClient(com.ning.http.client.AsyncHttpClientConfig) + */ + @Override + protected AsyncHttpClient createClient(AsyncHttpClientConfig config) { + AsyncHttpClient client; + if (config == null) { + config = new AsyncHttpClientConfig.Builder().build(); + } + AsyncHttpProvider ahp = getAsyncHttpProvider(config); + if (ahp == null) { + client = new AsyncHttpClient(config); + } else { + client = new AsyncHttpClient(ahp, config); + } + return client; + } + public void connect() throws InterruptedException, ExecutionException, IOException { - websocket = client.prepareGet(wsUri.toASCIIString()).execute( + websocket = getClient().prepareGet(getHttpUri().toASCIIString()).execute( new WebSocketUpgradeHandler.Builder() .addWebSocketListener(new WsListener()).build()).get(); } @Override - protected void doStart() throws Exception { - super.doStart(); - if (client == null) { - - AsyncHttpClientConfig config = null; - - if (clientConfig != null) { - AsyncHttpClientConfig.Builder builder = WsComponent.cloneConfig(clientConfig); - - if (sslContextParameters != null) { - SSLContext ssl = sslContextParameters.createSSLContext(); - builder.setSSLContext(ssl); - } - - config = builder.build(); - } else { - if (sslContextParameters != null) { - AsyncHttpClientConfig.Builder builder = new AsyncHttpClientConfig.Builder(); - SSLContext ssl = sslContextParameters.createSSLContext(); - builder.setSSLContext(ssl); - config = builder.build(); - } - } - - if (config == null) { - config = new AsyncHttpClientConfig.Builder().build(); - } - - AsyncHttpProvider ahp = getAsyncHttpProvider(config); - if (ahp == null) { - client = new AsyncHttpClient(config); - } else { - client = new AsyncHttpClient(ahp, config); - } - } - } - - @Override protected void doStop() throws Exception { - super.doStop(); if (websocket != null && websocket.isOpen()) { websocket.close(); } - if (client != null && !client.isClosed()) { - client.close(); - } - client = null; + super.doStop(); } void connect(WsConsumer wsConsumer) {