Author: davsclaus
Date: Mon Nov 9 10:16:08 2009
New Revision: 834008
URL: http://svn.apache.org/viewvc?rev=834008&view=rev
Log:
CAMEL-2135: Non blocking async jetty http producer. Work in progress.
Added:
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyConverter.java
(with props)
camel/trunk/components/camel-jetty/src/main/resources/META-INF/services/org/apache/camel/TypeConverter
camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerAsynchronousTest.java
- copied, changed from r833983,
camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerSlowResponseTest.java
camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerGoogleAsynchronousTest.java
- copied, changed from r833429,
camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerGoogleTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/Registry.java
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyContentExchange.java
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyFutureGetBody.java
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpComponent.java
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpEndpoint.java
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpProducer.java
camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerGoogleTest.java
camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerHeaderBasedCBRTestTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java?rev=834008&r1=834007&r2=834008&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
Mon Nov 9 10:16:08 2009
@@ -380,6 +380,7 @@
actualBody = actualBodyValues.get(i);
}
+ // TODO: coerce types before assertEquals
assertEquals("Body of message: " + i, expectedBody,
actualBody);
}
}
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/Registry.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/Registry.java?rev=834008&r1=834007&r2=834008&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/Registry.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/Registry.java Mon
Nov 9 10:16:08 2009
@@ -20,29 +20,29 @@
/**
* Represents a service registry which may be implemented via a Spring
ApplicationContext,
- * via JNDI, a simple Map or the OSGI Service Registry
+ * via JNDI, a simple Map or the OSGi Service Registry
*
* @version $Revision$
*/
public interface Registry {
/**
- * Looks up a service in the registry, returning the service or null if it
could not be found.
+ * Looks up a service in the registry based purely on name,
+ * returning the service or <tt>null</tt> if it could not be found.
*
* @param name the name of the service
- * @param type the type of the required service
- * @return the service from the registry or null if it could not be found
+ * @return the service from the registry or <tt>null</tt> if it could not
be found
*/
- <T> T lookup(String name, Class<T> type);
+ Object lookup(String name);
/**
- * Looks up a service in the registry based purely on name,
- * returning the service or null if it could not be found.
+ * Looks up a service in the registry, returning the service or
<tt>null</tt> if it could not be found.
*
* @param name the name of the service
- * @return the service from the registry or null if it could not be found
+ * @param type the type of the required service
+ * @return the service from the registry or <tt>null</tt> if it could not
be found
*/
- Object lookup(String name);
+ <T> T lookup(String name, Class<T> type);
/**
* Looks up services in the registry by their type.
Modified:
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyContentExchange.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyContentExchange.java?rev=834008&r1=834007&r2=834008&view=diff
==============================================================================
---
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyContentExchange.java
(original)
+++
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyContentExchange.java
Mon Nov 9 10:16:08 2009
@@ -18,11 +18,15 @@
import java.io.IOException;
import java.io.UnsupportedEncodingException;
+import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import org.apache.camel.Exchange;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.mortbay.io.Buffer;
import org.mortbay.jetty.HttpHeaders;
import org.mortbay.jetty.client.ContentExchange;
@@ -34,16 +38,28 @@
*/
public class JettyContentExchange extends ContentExchange {
+ private static final transient Log LOG =
LogFactory.getLog(JettyContentExchange.class);
+
+ private final Map<String, String> headers = new LinkedHashMap<String,
String>();
private CountDownLatch headersComplete = new CountDownLatch(1);
private CountDownLatch bodyComplete = new CountDownLatch(1);
- private final Map<String, String> headers = new LinkedHashMap<String,
String>();
- private boolean failed;
+ private volatile boolean failed;
+ private volatile Exchange exchange;
+ private volatile Collection<Exchange> completeTasks;
public JettyContentExchange() {
// keep headers by default
super(true);
}
+ public void setExchange(Exchange exchange) {
+ this.exchange = exchange;
+ }
+
+ public void setCompleteTasks(Collection<Exchange> completeTasks) {
+ this.completeTasks = completeTasks;
+ }
+
@Override
protected void onResponseHeader(Buffer name, Buffer value) throws
IOException {
super.onResponseHeader(name, value);
@@ -53,11 +69,26 @@
@Override
protected void onResponseHeaderComplete() throws IOException {
headersComplete.countDown();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("onResponseHeader for " + getUrl());
+ }
}
@Override
protected void onResponseComplete() throws IOException {
bodyComplete.countDown();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("onResponseComplete for " + getUrl());
+ }
+
+ if (completeTasks != null && exchange != null) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Adding Exchange to completed task: " + exchange);
+ }
+ // we are complete so add the exchange to completed tasks
+ completeTasks.add(exchange);
+ }
}
@Override
@@ -83,14 +114,23 @@
}
public void waitForHeadersToComplete() throws InterruptedException {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Waiting for headers to complete for " + getUrl());
+ }
headersComplete.await();
}
public void waitForBodyToComplete() throws InterruptedException {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Waiting for body to complete for " + getUrl());
+ }
bodyComplete.await();
}
public boolean waitForBodyToComplete(long timeout, TimeUnit timeUnit)
throws InterruptedException {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Waiting for body to complete for " + getUrl());
+ }
return bodyComplete.await(timeout, timeUnit);
}
Added:
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyConverter.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyConverter.java?rev=834008&view=auto
==============================================================================
---
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyConverter.java
(added)
+++
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyConverter.java
Mon Nov 9 10:16:08 2009
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jetty;
+
+import org.apache.camel.Converter;
+import org.apache.camel.Exchange;
+import org.apache.camel.FallbackConverter;
+import org.apache.camel.StreamCache;
+import org.apache.camel.TypeConverter;
+import org.apache.camel.spi.TypeConverterRegistry;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * @version $Revision$
+ */
+...@converter
+public final class JettyConverter {
+
+ private static final Log LOG = LogFactory.getLog(JettyConverter.class);
+
+ private JettyConverter() {
+ }
+
+ @FallbackConverter
+ @SuppressWarnings("unchecked")
+ public static Object convertTo(Class<?> type, Exchange exchange, Object
value, TypeConverterRegistry registry) throws Exception {
+ // do not convert to stream cache
+ if (StreamCache.class.isAssignableFrom(value.getClass())) {
+ return null;
+ }
+
+ if (JettyHttpMessage.class.isAssignableFrom(value.getClass())) {
+ JettyHttpMessage message = (JettyHttpMessage) value;
+
+ Object body = message.getBody();
+ if (body == null) {
+ return null;
+ }
+
+ if (body instanceof JettyFutureGetBody) {
+ JettyFutureGetBody future = (JettyFutureGetBody) body;
+
+ if (future.isCancelled()) {
+ // return void to indicate its not possible to convert at
this time
+ return Void.TYPE;
+ }
+
+ // do some trace logging as the get is blocking until the
response is ready
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Getting future response");
+ }
+
+ Object reply = future.get();
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Got future response");
+ }
+
+ if (reply == null) {
+ // return void to indicate its not possible to convert at
this time
+ return Void.TYPE;
+ }
+
+ // maybe from is already the type we want
+ if (type.isAssignableFrom(reply.getClass())) {
+ return type.cast(reply);
+ }
+
+ // no then try to lookup a type converter
+ TypeConverter tc = registry.lookup(type, reply.getClass());
+ if (tc != null) {
+ return tc.convertTo(type, exchange, reply);
+ }
+ } else {
+ // no then try to lookup a type converter
+ TypeConverter tc = registry.lookup(type, body.getClass());
+ if (tc != null) {
+ return tc.convertTo(type, exchange, body);
+ }
+ }
+ }
+
+ return null;
+ }
+
+}
Propchange:
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyConverter.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyConverter.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified:
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyFutureGetBody.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyFutureGetBody.java?rev=834008&r1=834007&r2=834008&view=diff
==============================================================================
---
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyFutureGetBody.java
(original)
+++
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyFutureGetBody.java
Mon Nov 9 10:16:08 2009
@@ -77,6 +77,7 @@
if (done) {
return doGetBody();
} else {
+ // timeout occurred
ExchangeTimedOutException cause = new
ExchangeTimedOutException(exchange, timeout);
throw new ExecutionException(cause);
}
Modified:
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpComponent.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpComponent.java?rev=834008&r1=834007&r2=834008&view=diff
==============================================================================
---
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpComponent.java
(original)
+++
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpComponent.java
Mon Nov 9 10:16:08 2009
@@ -23,6 +23,7 @@
import java.util.Map;
import org.apache.camel.Endpoint;
+import org.apache.camel.RuntimeCamelException;
import org.apache.camel.component.http.CamelServlet;
import org.apache.camel.component.http.HttpComponent;
import org.apache.camel.component.http.HttpConsumer;
@@ -31,12 +32,13 @@
import org.apache.camel.util.IntrospectionSupport;
import org.apache.camel.util.URISupport;
import org.apache.camel.util.UnsafeUriCharactersEncoder;
-import org.apache.commons.httpclient.params.HttpClientParams;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.mortbay.component.LifeCycle;
import org.mortbay.jetty.Connector;
import org.mortbay.jetty.Handler;
import org.mortbay.jetty.Server;
+import org.mortbay.jetty.client.Address;
import org.mortbay.jetty.client.HttpClient;
import org.mortbay.jetty.handler.ContextHandlerCollection;
import org.mortbay.jetty.nio.SelectChannelConnector;
@@ -44,6 +46,8 @@
import org.mortbay.jetty.servlet.Context;
import org.mortbay.jetty.servlet.ServletHolder;
import org.mortbay.jetty.servlet.SessionHandler;
+import org.mortbay.thread.QueuedThreadPool;
+import org.mortbay.thread.ThreadPool;
/**
* An HttpComponent which starts an embedded Jetty for to handle consuming from
@@ -63,6 +67,9 @@
protected String sslKeystore;
protected Map<Integer, SslSocketConnector> sslSocketConnectors;
protected HttpClient httpClient;
+ protected ThreadPool httpClientThreadPool;
+ protected Integer httpClientMinThreads;
+ protected Integer httpClientMaxThreads;
class ConnectorRef {
Server server;
@@ -91,9 +98,6 @@
protected Endpoint createEndpoint(String uri, String remaining, Map
parameters) throws Exception {
uri = uri.startsWith("jetty:") ? remaining : uri;
- HttpClientParams params = new HttpClientParams();
- IntrospectionSupport.setProperties(params, parameters, "httpClient.");
-
// handlers
List<Handler> handlerList = new ArrayList<Handler>();
String handlers = getAndRemoveParameter(parameters, "handlers",
String.class);
@@ -109,7 +113,7 @@
// configure regular parameters
configureParameters(parameters);
- JettyHttpEndpoint result = new JettyHttpEndpoint(this, uri, null,
params, getHttpConnectionManager(), httpClientConfigurer);
+ JettyHttpEndpoint result = new JettyHttpEndpoint(this, uri, null);
if (httpBinding != null) {
result.setBinding(httpBinding);
}
@@ -119,6 +123,16 @@
}
setProperties(result, parameters);
+ // configure http client if we have url configuration for it
+ if (IntrospectionSupport.hasProperties(parameters, "httpClient.")) {
+ // configure Jetty http client
+ result.setClient(getHttpClient());
+ // set additional parameters on http client
+ IntrospectionSupport.setProperties(getHttpClient(), parameters,
"httpClient.");
+ // validate that we could resolve all httpClient. parameters as
this component is lenient
+ validateParameters(uri, parameters, "httpClient.");
+ }
+
// create the http uri after we have configured all the parameters on
the camel objects
URI httpUri = URISupport.createRemainingURI(new
URI(UnsafeUriCharactersEncoder.encode(uri)), parameters);
result.setHttpUri(httpUri);
@@ -184,8 +198,7 @@
}
/**
- * Disconnects the URL specified on the endpoint from the specified
- * processor.
+ * Disconnects the URL specified on the endpoint from the specified
processor.
*/
@Override
public void disconnect(HttpConsumer consumer) throws Exception {
@@ -281,7 +294,39 @@
sslSocketConnectors = connectors;
}
- public HttpClient getHttpClient() {
+ public synchronized HttpClient getHttpClient() {
+ if (httpClient == null) {
+ httpClient = new HttpClient();
+ httpClient.setConnectorType(HttpClient.CONNECTOR_SELECT_CHANNEL);
+
+ if (System.getProperty("http.proxyHost") != null &&
System.getProperty("http.proxyPort") != null) {
+ String host = System.getProperty("http.proxyHost");
+ int port =
Integer.parseInt(System.getProperty("http.proxyPort"));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Java System Property http.proxyHost and
http.proxyPort detected. Using http proxy host: "
+ + host + " port: " + port);
+ }
+ httpClient.setProxy(new Address(host, port));
+ }
+
+ // use QueueThreadPool as the default bounded is deprecated (see
SMXCOMP-157)
+ if (getHttpClientThreadPool() == null) {
+ QueuedThreadPool qtp = new QueuedThreadPool();
+ if (httpClientMinThreads != null) {
+ qtp.setMinThreads(httpClientMinThreads.intValue());
+ }
+ if (httpClientMaxThreads != null) {
+ qtp.setMaxThreads(httpClientMaxThreads.intValue());
+ }
+ try {
+ qtp.start();
+ } catch (Exception e) {
+ throw new RuntimeCamelException("Error starting
JettyHttpClient thread pool: " + qtp, e);
+ }
+ setHttpClientThreadPool(qtp);
+ }
+ httpClient.setThreadPool(getHttpClientThreadPool());
+ }
return httpClient;
}
@@ -289,6 +334,30 @@
this.httpClient = httpClient;
}
+ public ThreadPool getHttpClientThreadPool() {
+ return httpClientThreadPool;
+ }
+
+ public void setHttpClientThreadPool(ThreadPool httpClientThreadPool) {
+ this.httpClientThreadPool = httpClientThreadPool;
+ }
+
+ public Integer getHttpClientMinThreads() {
+ return httpClientMinThreads;
+ }
+
+ public void setHttpClientMinThreads(Integer httpClientMinThreads) {
+ this.httpClientMinThreads = httpClientMinThreads;
+ }
+
+ public Integer getHttpClientMaxThreads() {
+ return httpClientMaxThreads;
+ }
+
+ public void setHttpClientMaxThreads(Integer httpClientMaxThreads) {
+ this.httpClientMaxThreads = httpClientMaxThreads;
+ }
+
// Implementation methods
//
-------------------------------------------------------------------------
protected CamelServlet createServletForConnector(Server server, Connector
connector, List<Handler> handlers) throws Exception {
@@ -320,4 +389,28 @@
server.start();
return server;
}
+
+ @Override
+ protected void doStart() throws Exception {
+ super.doStart();
+ if (httpClientThreadPool != null && httpClientThreadPool instanceof
LifeCycle) {
+ LifeCycle lc = (LifeCycle) httpClientThreadPool;
+ lc.start();
+ }
+ if (httpClient != null && !httpClient.isStarted()) {
+ httpClient.start();
+ }
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ super.doStop();
+ if (httpClient != null) {
+ httpClient.stop();
+ }
+ if (httpClientThreadPool != null && httpClientThreadPool instanceof
LifeCycle) {
+ LifeCycle lc = (LifeCycle) httpClientThreadPool;
+ lc.stop();
+ }
+ }
}
Modified:
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpEndpoint.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpEndpoint.java?rev=834008&r1=834007&r2=834008&view=diff
==============================================================================
---
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpEndpoint.java
(original)
+++
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpEndpoint.java
Mon Nov 9 10:16:08 2009
@@ -23,15 +23,9 @@
import org.apache.camel.Consumer;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
-import org.apache.camel.component.http.HttpClientConfigurer;
import org.apache.camel.component.http.HttpConsumer;
import org.apache.camel.component.http.HttpEndpoint;
-import org.apache.commons.httpclient.HttpConnectionManager;
-import org.apache.commons.httpclient.params.HttpClientParams;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.mortbay.jetty.Handler;
-import org.mortbay.jetty.client.Address;
import org.mortbay.jetty.client.HttpClient;
/**
@@ -39,14 +33,11 @@
*/
public class JettyHttpEndpoint extends HttpEndpoint {
- private static final transient Log LOG =
LogFactory.getLog(JettyHttpEndpoint.class);
private boolean sessionSupport;
private List<Handler> handlers;
-
- public JettyHttpEndpoint(JettyHttpComponent component, String uri, URI
httpURL, HttpClientParams clientParams,
- HttpConnectionManager httpConnectionManager,
HttpClientConfigurer clientConfigurer) throws URISyntaxException {
- super(uri, component, httpURL, clientParams, httpConnectionManager,
clientConfigurer);
- }
+ private HttpClient client;
+ private boolean synchronous = true;
+ private int concurrentConsumers = 1;
public JettyHttpEndpoint(JettyHttpComponent component, String uri, URI
httpURL) throws URISyntaxException {
super(uri, component, httpURL);
@@ -59,7 +50,7 @@
@Override
public Producer createProducer() throws Exception {
- return new JettyHttpProducer(this);
+ return new JettyHttpProducer(this, getClient());
}
@Override
@@ -83,30 +74,31 @@
this.handlers = handlers;
}
- /**
- * Factory method used by producers and consumers to create a new
{...@link org.apache.commons.httpclient.HttpClient} instance
- */
- public HttpClient getJettyHttpClient() throws Exception {
- HttpClient answer = getComponent().getHttpClient();
- if (answer == null) {
- answer = new HttpClient();
- answer.setConnectorType(HttpClient.CONNECTOR_SELECT_CHANNEL);
-
- if (System.getProperty("http.proxyHost") != null &&
System.getProperty("http.proxyPort") != null) {
- String host = System.getProperty("http.proxyHost");
- int port =
Integer.parseInt(System.getProperty("http.proxyPort"));
- if (LOG.isDebugEnabled()) {
- LOG.debug("Java System Property http.proxyHost and
http.proxyPort detected. Using http proxy host: "
- + host + " port: " + port);
- }
- answer.setProxy(new Address(host, port));
- }
+ public HttpClient getClient() {
+ if (client == null) {
+ return getComponent().getHttpClient();
+ }
+ return client;
+ }
- // TODO: allow jetty producer configuration from uri
+ public void setClient(HttpClient client) {
+ this.client = client;
+ }
- answer.start();
- }
- return answer;
+ public boolean isSynchronous() {
+ return synchronous;
+ }
+
+ public void setSynchronous(boolean synchronous) {
+ this.synchronous = synchronous;
+ }
+
+ public int getConcurrentConsumers() {
+ return concurrentConsumers;
+ }
+
+ public void setConcurrentConsumers(int concurrentConsumers) {
+ this.concurrentConsumers = concurrentConsumers;
}
}
Modified:
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpProducer.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpProducer.java?rev=834008&r1=834007&r2=834008&view=diff
==============================================================================
---
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpProducer.java
(original)
+++
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpProducer.java
Mon Nov 9 10:16:08 2009
@@ -19,14 +19,21 @@
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
import org.apache.camel.component.http.HttpMethods;
import org.apache.camel.component.http.helper.HttpProducerHelper;
import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.util.ExchangeHelper;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.URISupport;
+import org.apache.camel.util.concurrent.ExecutorServiceHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.mortbay.jetty.client.HttpClient;
@@ -34,12 +41,18 @@
/**
* @version $Revision$
*/
-public class JettyHttpProducer extends DefaultProducer {
+public class JettyHttpProducer extends DefaultProducer implements Runnable {
private static final transient Log LOG =
LogFactory.getLog(JettyHttpProducer.class);
- private boolean throwException;
+ private final BlockingQueue<Exchange> completeTasks = new
LinkedBlockingQueue<Exchange>();
+ private ExecutorService executor;
+ private final HttpClient client;
- public JettyHttpProducer(Endpoint endpoint) {
+ // TODO: support that bridge option
+ // TODO: more unit tests
+
+ public JettyHttpProducer(Endpoint endpoint, HttpClient client) {
super(endpoint);
+ this.client = client;
}
@Override
@@ -48,22 +61,47 @@
}
public void process(Exchange exchange) throws Exception {
- HttpClient client = getEndpoint().getJettyHttpClient();
+ HttpClient client = getEndpoint().getClient();
JettyContentExchange httpExchange = createHttpExchange(exchange);
+ if (getEndpoint().isSynchronous()) {
+ sendSynchronous(exchange, client, httpExchange);
+ } else {
+ sendAsynchronous(exchange, client, httpExchange);
+ }
+ }
+
+ protected void sendAsynchronous(final Exchange exchange, final HttpClient
client, final JettyContentExchange httpExchange) throws IOException {
+ // use a new copy of the exchange to route async and handover the on
completion to the new copy
+ // so its the new copy that performs the on completion callback when
its done
+ final Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange,
true);
+ // the copy must be an in ouy MEP
+ copy.setPattern(ExchangePattern.InOut);
+
+ // configure http exchange to signal when its complete
+ httpExchange.setCompleteTasks(completeTasks);
+ httpExchange.setExchange(copy);
+
// set the body with the message holder
- exchange.setOut(new JettyHttpMessage(exchange, httpExchange,
getEndpoint().isThrowExceptionOnFailure()));
+ copy.setOut(new JettyHttpMessage(exchange, httpExchange,
getEndpoint().isThrowExceptionOnFailure()));
- // will send it async
- sendExchange(client, httpExchange);
+ doSendExchange(client, httpExchange);
- // TODO: configuration of http client as getter/setters and URI on
component
- // TODO: support that bridge option
- // TODO: more unit tests
+ // now we need to let the original exchange to stop
+ // and let that copy exchange continue
+ // TODO: Use something that marks it as async routed
+ exchange.setProperty(Exchange.ROUTE_STOP, Boolean.TRUE);
}
- protected void sendExchange(HttpClient client, JettyContentExchange
httpExchange) throws IOException {
+ protected void sendSynchronous(Exchange exchange, HttpClient client,
JettyContentExchange httpExchange) throws IOException {
+ // set the body with the message holder
+ exchange.setOut(new JettyHttpMessage(exchange, httpExchange,
getEndpoint().isThrowExceptionOnFailure()));
+
+ doSendExchange(client, httpExchange);
+ }
+
+ protected void doSendExchange(HttpClient client, JettyContentExchange
httpExchange) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Sending HTTP request to: " + httpExchange.getUrl());
}
@@ -103,4 +141,56 @@
}
}
+ public void run() {
+ while (isRunAllowed()) {
+ Exchange exchange;
+ try {
+ // TODO: Wonder if we can use take instead of poll with
timeout?
+ exchange = completeTasks.poll(1000, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ LOG.debug("Sleep interrupted, are we stopping? " +
(isStopping() || isStopped()));
+ continue;
+ }
+
+ if (exchange != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Continue to route Exchange: " + exchange);
+ }
+
+ // TODO: hook into exiting route path
+
exchange.getContext().createProducerTemplate().send("mock:result", exchange);
+ }
+ }
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ super.doStart();
+
+ client.start();
+
+ // this is only needed if we are asynchronous where we need to have a
thread pool of listeners
+ // that will process the completed tasks
+ if (!getEndpoint().isSynchronous()) {
+ int poolSize = getEndpoint().getConcurrentConsumers();
+ executor = ExecutorServiceHelper.newFixedThreadPool(poolSize,
getEndpoint().getEndpointUri(), true);
+ for (int i = 0; i < poolSize; i++) {
+ executor.execute(this);
+ }
+ }
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ super.doStop();
+
+ client.stop();
+
+ if (executor != null) {
+ executor.shutdownNow();
+ executor = null;
+ }
+ completeTasks.clear();
+ }
+
}
Added:
camel/trunk/components/camel-jetty/src/main/resources/META-INF/services/org/apache/camel/TypeConverter
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/main/resources/META-INF/services/org/apache/camel/TypeConverter?rev=834008&view=auto
==============================================================================
---
camel/trunk/components/camel-jetty/src/main/resources/META-INF/services/org/apache/camel/TypeConverter
(added)
+++
camel/trunk/components/camel-jetty/src/main/resources/META-INF/services/org/apache/camel/TypeConverter
Mon Nov 9 10:16:08 2009
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.camel.component.jetty
\ No newline at end of file
Copied:
camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerAsynchronousTest.java
(from r833983,
camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerSlowResponseTest.java)
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerAsynchronousTest.java?p2=camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerAsynchronousTest.java&p1=camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerSlowResponseTest.java&r1=833983&r2=834008&rev=834008&view=diff
==============================================================================
---
camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerSlowResponseTest.java
(original)
+++
camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerAsynchronousTest.java
Mon Nov 9 10:16:08 2009
@@ -16,35 +16,40 @@
*/
package org.apache.camel.component.jetty.jettyproducer;
-import java.util.concurrent.Future;
import javax.servlet.http.HttpServletResponse;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Test;
/**
* @version $Revision$
*/
-public class JettyHttpProducerSlowResponseTest extends CamelTestSupport {
+public class JettyHttpProducerAsynchronousTest extends CamelTestSupport {
- private String url = "jetty://http://0.0.0.0:9123/foo";
+ private static String thread1;
+ private static String thread2;
+
+ private String url =
"jetty://http://0.0.0.0:9123/foo?synchronous=false&concurrentConsumers=5";
@Test
- public void testSlowReply() throws Exception {
- Exchange exchange = template.request(url, null);
- assertNotNull(exchange);
-
- Future<String> future = exchange.getOut().getBody(Future.class);
- assertNotNull(future);
- assertEquals(false, future.isDone());
+ public void testAsynchronous() throws Exception {
+ thread1 = "";
+ thread2 = "";
+
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMessageCount(1);
+ mock.message(0).outBody().isEqualTo("Bye World");
- String reply = future.get();
- assertEquals("Bye World", reply);
+ Object body = null;
+ template.sendBody("direct:start", body);
- assertEquals(3, exchange.getOut().getHeaders().size());
+ assertMockEndpointsSatisfied();
+
+ assertNotSame("Should not use same threads", thread1, thread2);
}
@Override
@@ -52,6 +57,16 @@
return new RouteBuilder() {
@Override
public void configure() throws Exception {
+ from("direct:start").process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ thread1 = Thread.currentThread().getName();
+ }
+ }).to(url).process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ thread2 = Thread.currentThread().getName();
+ }
+ }).to("mock:result");
+
from(url).process(new Processor() {
public void process(Exchange exchange) throws Exception {
HttpServletResponse res =
exchange.getIn().getBody(HttpServletResponse.class);
Copied:
camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerGoogleAsynchronousTest.java
(from r833429,
camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerGoogleTest.java)
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerGoogleAsynchronousTest.java?p2=camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerGoogleAsynchronousTest.java&p1=camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerGoogleTest.java&r1=833429&r2=834008&rev=834008&view=diff
==============================================================================
---
camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerGoogleTest.java
(original)
+++
camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerGoogleAsynchronousTest.java
Mon Nov 9 10:16:08 2009
@@ -16,29 +16,26 @@
*/
package org.apache.camel.component.jetty.jettyproducer;
-import java.util.concurrent.Future;
-
import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Test;
/**
* @version $Revision$
*/
-public class JettyHttpProducerGoogleTest extends CamelTestSupport {
+public class JettyHttpProducerGoogleAsynchronousTest extends CamelTestSupport {
@Test
- public void testGoogleFrontPage() throws Exception {
- String reply = template.requestBody("direct:start", null,
String.class);
- assertNotNull(reply);
- }
+ public void testGoogleFrontPageAsync() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMessageCount(1);
+ mock.message(0).outBody(String.class).contains("google");
- @Test
- public void testGoogleFrontPageFutureTask() throws Exception {
- Object body = null;
- Future<String> reply = (Future<String>)
template.requestBody("direct:start", body);
- assertNotNull(reply);
- assertNotNull(reply.get());
+ template.sendBody("direct:start", null);
+ System.out.println("I am not blocked");
+
+ assertMockEndpointsSatisfied();
}
@Override
@@ -46,10 +43,11 @@
return new RouteBuilder() {
@Override
public void configure() throws Exception {
- // to prevent redirect being thrown as an exception
-
from("direct:start").to("jetty://http://www.google.com?throwExceptionOnFailure=false");
+ from("direct:start")
+ // to prevent redirect being thrown as an exception
+
.to("jetty://http://www.google.com?throwExceptionOnFailure=false&synchronous=false&concurrentConsumers=5")
+ .to("mock:result");
}
};
}
-}
-
+}
\ No newline at end of file
Modified:
camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerGoogleTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerGoogleTest.java?rev=834008&r1=834007&r2=834008&view=diff
==============================================================================
---
camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerGoogleTest.java
(original)
+++
camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerGoogleTest.java
Mon Nov 9 10:16:08 2009
@@ -47,7 +47,7 @@
@Override
public void configure() throws Exception {
// to prevent redirect being thrown as an exception
-
from("direct:start").to("jetty://http://www.google.com?throwExceptionOnFailure=false");
+
from("direct:start").to("jetty://http://www.google.com?throwExceptionOnFailure=false&synchronous=true");
}
};
}
Modified:
camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerHeaderBasedCBRTestTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerHeaderBasedCBRTestTest.java?rev=834008&r1=834007&r2=834008&view=diff
==============================================================================
---
camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerHeaderBasedCBRTestTest.java
(original)
+++
camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerHeaderBasedCBRTestTest.java
Mon Nov 9 10:16:08 2009
@@ -31,8 +31,8 @@
*/
public class JettyHttpProducerHeaderBasedCBRTestTest extends CamelTestSupport {
- private String url = "jetty://http://0.0.0.0:9123/foo";
private static String step;
+ private String url = "jetty://http://0.0.0.0:9123/foo";
@Test
@SuppressWarnings("unchecked")
@@ -49,6 +49,7 @@
});
template.sendBody("direct:start", "Hello World");
+ step += "D";
assertMockEndpointsSatisfied();
@@ -58,7 +59,7 @@
assertEquals("Bye World", future.get());
// and ensure the we could CBR on the header before we got the reply
body
- assertEquals("ACB", step);
+ assertTrue("Should be either ACDB or ADCB was " + step,
step.equals("ACDB") || step.equals("ADCB"));
}
@Override