Author: davsclaus
Date: Mon Nov  9 10:16:08 2009
New Revision: 834008

URL: http://svn.apache.org/viewvc?rev=834008&view=rev
Log:
CAMEL-2135: Non blocking async jetty http producer. Work in progress.

Added:
    
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyConverter.java
   (with props)
    
camel/trunk/components/camel-jetty/src/main/resources/META-INF/services/org/apache/camel/TypeConverter
    
camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerAsynchronousTest.java
      - copied, changed from r833983, 
camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerSlowResponseTest.java
    
camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerGoogleAsynchronousTest.java
      - copied, changed from r833429, 
camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerGoogleTest.java
Modified:
    
camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/spi/Registry.java
    
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyContentExchange.java
    
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyFutureGetBody.java
    
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpComponent.java
    
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpEndpoint.java
    
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpProducer.java
    
camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerGoogleTest.java
    
camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerHeaderBasedCBRTestTest.java

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java?rev=834008&r1=834007&r2=834008&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
 Mon Nov  9 10:16:08 2009
@@ -380,6 +380,7 @@
                         actualBody = actualBodyValues.get(i);
                     }
 
+                    // TODO: coerce types before assertEquals
                     assertEquals("Body of message: " + i, expectedBody, 
actualBody);
                 }
             }

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/Registry.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/Registry.java?rev=834008&r1=834007&r2=834008&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/Registry.java 
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/Registry.java Mon 
Nov  9 10:16:08 2009
@@ -20,29 +20,29 @@
 
 /**
  * Represents a service registry which may be implemented via a Spring 
ApplicationContext,
- * via JNDI, a simple Map or the OSGI Service Registry
+ * via JNDI, a simple Map or the OSGi Service Registry
  *
  * @version $Revision$
  */
 public interface Registry {
 
     /**
-     * Looks up a service in the registry, returning the service or null if it 
could not be found.
+     * Looks up a service in the registry based purely on name,
+     * returning the service or <tt>null</tt> if it could not be found.
      *
      * @param name the name of the service
-     * @param type the type of the required service
-     * @return the service from the registry or null if it could not be found
+     * @return the service from the registry or <tt>null</tt> if it could not 
be found
      */
-    <T> T lookup(String name, Class<T> type);
+    Object lookup(String name);
 
     /**
-     * Looks up a service in the registry based purely on name,
-     * returning the service or null if it could not be found.
+     * Looks up a service in the registry, returning the service or 
<tt>null</tt> if it could not be found.
      *
      * @param name the name of the service
-     * @return the service from the registry or null if it could not be found
+     * @param type the type of the required service
+     * @return the service from the registry or <tt>null</tt> if it could not 
be found
      */
-    Object lookup(String name);
+    <T> T lookup(String name, Class<T> type);
 
     /**
      * Looks up services in the registry by their type.

Modified: 
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyContentExchange.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyContentExchange.java?rev=834008&r1=834007&r2=834008&view=diff
==============================================================================
--- 
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyContentExchange.java
 (original)
+++ 
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyContentExchange.java
 Mon Nov  9 10:16:08 2009
@@ -18,11 +18,15 @@
 
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
+import java.util.Collection;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.camel.Exchange;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.mortbay.io.Buffer;
 import org.mortbay.jetty.HttpHeaders;
 import org.mortbay.jetty.client.ContentExchange;
@@ -34,16 +38,28 @@
  */
 public class JettyContentExchange extends ContentExchange {
 
+    private static final transient Log LOG = 
LogFactory.getLog(JettyContentExchange.class);
+
+    private final Map<String, String> headers = new LinkedHashMap<String, 
String>();
     private CountDownLatch headersComplete = new CountDownLatch(1);
     private CountDownLatch bodyComplete = new CountDownLatch(1);
-    private final Map<String, String> headers = new LinkedHashMap<String, 
String>();
-    private boolean failed;
+    private volatile boolean failed;
+    private volatile Exchange exchange;
+    private volatile Collection<Exchange> completeTasks;
 
     public JettyContentExchange() {
         // keep headers by default
         super(true);
     }
 
+    public void setExchange(Exchange exchange) {
+        this.exchange = exchange;
+    }
+
+    public void setCompleteTasks(Collection<Exchange> completeTasks) {
+        this.completeTasks = completeTasks;
+    }
+
     @Override
     protected void onResponseHeader(Buffer name, Buffer value) throws 
IOException {
         super.onResponseHeader(name, value);
@@ -53,11 +69,26 @@
     @Override
     protected void onResponseHeaderComplete() throws IOException {
         headersComplete.countDown();
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("onResponseHeader for " + getUrl());
+        }
     }
 
     @Override
     protected void onResponseComplete() throws IOException {
         bodyComplete.countDown();
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("onResponseComplete for " + getUrl());
+        }
+
+        if (completeTasks != null && exchange != null) {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Adding Exchange to completed task: " + exchange);
+            }
+            // we are complete so add the exchange to completed tasks
+            completeTasks.add(exchange);
+        }
     }
 
     @Override
@@ -83,14 +114,23 @@
     }
 
     public void waitForHeadersToComplete() throws InterruptedException {
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Waiting for headers to complete for " + getUrl());
+        }
         headersComplete.await();
     }
 
     public void waitForBodyToComplete() throws InterruptedException {
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Waiting for body to complete for " + getUrl());
+        }
         bodyComplete.await();
     }
 
     public boolean waitForBodyToComplete(long timeout, TimeUnit timeUnit) 
throws InterruptedException {
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Waiting for body to complete for " + getUrl());
+        }
         return bodyComplete.await(timeout, timeUnit);
     }
 

Added: 
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyConverter.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyConverter.java?rev=834008&view=auto
==============================================================================
--- 
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyConverter.java
 (added)
+++ 
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyConverter.java
 Mon Nov  9 10:16:08 2009
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jetty;
+
+import org.apache.camel.Converter;
+import org.apache.camel.Exchange;
+import org.apache.camel.FallbackConverter;
+import org.apache.camel.StreamCache;
+import org.apache.camel.TypeConverter;
+import org.apache.camel.spi.TypeConverterRegistry;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * @version $Revision$
+ */
+...@converter
+public final class JettyConverter {
+
+    private static final Log LOG = LogFactory.getLog(JettyConverter.class);
+
+    private JettyConverter() {
+    }
+
+    @FallbackConverter
+    @SuppressWarnings("unchecked")
+    public static Object convertTo(Class<?> type, Exchange exchange, Object 
value, TypeConverterRegistry registry) throws Exception {
+        // do not convert to stream cache
+        if (StreamCache.class.isAssignableFrom(value.getClass())) {
+            return null;
+        }
+
+        if (JettyHttpMessage.class.isAssignableFrom(value.getClass())) {
+            JettyHttpMessage message = (JettyHttpMessage) value;
+
+            Object body = message.getBody();
+            if (body == null) {
+                return null;
+            }
+
+            if (body instanceof JettyFutureGetBody) {
+                JettyFutureGetBody future = (JettyFutureGetBody) body;
+
+                if (future.isCancelled()) {
+                    // return void to indicate its not possible to convert at 
this time
+                    return Void.TYPE;
+                }
+
+                // do some trace logging as the get is blocking until the 
response is ready
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Getting future response");
+                }
+
+                Object reply = future.get();
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Got future response");
+                }
+
+                if (reply == null) {
+                    // return void to indicate its not possible to convert at 
this time
+                    return Void.TYPE;
+                }
+
+                // maybe from is already the type we want
+                if (type.isAssignableFrom(reply.getClass())) {
+                    return type.cast(reply);
+                }
+
+                // no then try to lookup a type converter
+                TypeConverter tc = registry.lookup(type, reply.getClass());
+                if (tc != null) {
+                    return tc.convertTo(type, exchange, reply);
+                }
+            } else {
+                // no then try to lookup a type converter
+                TypeConverter tc = registry.lookup(type, body.getClass());
+                if (tc != null) {
+                    return tc.convertTo(type, exchange, body);
+                }
+            }
+        }
+
+        return null;
+    }
+
+}

Propchange: 
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyConverter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyConverter.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: 
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyFutureGetBody.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyFutureGetBody.java?rev=834008&r1=834007&r2=834008&view=diff
==============================================================================
--- 
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyFutureGetBody.java
 (original)
+++ 
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyFutureGetBody.java
 Mon Nov  9 10:16:08 2009
@@ -77,6 +77,7 @@
             if (done) {
                 return doGetBody();
             } else {
+                // timeout occurred
                 ExchangeTimedOutException cause = new 
ExchangeTimedOutException(exchange, timeout);
                 throw new ExecutionException(cause);
             }

Modified: 
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpComponent.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpComponent.java?rev=834008&r1=834007&r2=834008&view=diff
==============================================================================
--- 
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpComponent.java
 (original)
+++ 
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpComponent.java
 Mon Nov  9 10:16:08 2009
@@ -23,6 +23,7 @@
 import java.util.Map;
 
 import org.apache.camel.Endpoint;
+import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.component.http.CamelServlet;
 import org.apache.camel.component.http.HttpComponent;
 import org.apache.camel.component.http.HttpConsumer;
@@ -31,12 +32,13 @@
 import org.apache.camel.util.IntrospectionSupport;
 import org.apache.camel.util.URISupport;
 import org.apache.camel.util.UnsafeUriCharactersEncoder;
-import org.apache.commons.httpclient.params.HttpClientParams;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.mortbay.component.LifeCycle;
 import org.mortbay.jetty.Connector;
 import org.mortbay.jetty.Handler;
 import org.mortbay.jetty.Server;
+import org.mortbay.jetty.client.Address;
 import org.mortbay.jetty.client.HttpClient;
 import org.mortbay.jetty.handler.ContextHandlerCollection;
 import org.mortbay.jetty.nio.SelectChannelConnector;
@@ -44,6 +46,8 @@
 import org.mortbay.jetty.servlet.Context;
 import org.mortbay.jetty.servlet.ServletHolder;
 import org.mortbay.jetty.servlet.SessionHandler;
+import org.mortbay.thread.QueuedThreadPool;
+import org.mortbay.thread.ThreadPool;
 
 /**
  * An HttpComponent which starts an embedded Jetty for to handle consuming from
@@ -63,6 +67,9 @@
     protected String sslKeystore;
     protected Map<Integer, SslSocketConnector> sslSocketConnectors;
     protected HttpClient httpClient;
+    protected ThreadPool httpClientThreadPool;
+    protected Integer httpClientMinThreads;
+    protected Integer httpClientMaxThreads;
 
     class ConnectorRef {
         Server server;
@@ -91,9 +98,6 @@
     protected Endpoint createEndpoint(String uri, String remaining, Map 
parameters) throws Exception {
         uri = uri.startsWith("jetty:") ? remaining : uri;
 
-        HttpClientParams params = new HttpClientParams();
-        IntrospectionSupport.setProperties(params, parameters, "httpClient.");
-
         // handlers
         List<Handler> handlerList = new ArrayList<Handler>();
         String handlers = getAndRemoveParameter(parameters, "handlers", 
String.class);
@@ -109,7 +113,7 @@
         // configure regular parameters
         configureParameters(parameters);
 
-        JettyHttpEndpoint result = new JettyHttpEndpoint(this, uri, null, 
params, getHttpConnectionManager(), httpClientConfigurer);
+        JettyHttpEndpoint result = new JettyHttpEndpoint(this, uri, null);
         if (httpBinding != null) {
             result.setBinding(httpBinding);
         }
@@ -119,6 +123,16 @@
         }
         setProperties(result, parameters);
 
+        // configure http client if we have url configuration for it
+        if (IntrospectionSupport.hasProperties(parameters, "httpClient.")) {
+            // configure Jetty http client
+            result.setClient(getHttpClient());
+            // set additional parameters on http client
+            IntrospectionSupport.setProperties(getHttpClient(), parameters, 
"httpClient.");
+            // validate that we could resolve all httpClient. parameters as 
this component is lenient
+            validateParameters(uri, parameters, "httpClient.");
+        }
+
         // create the http uri after we have configured all the parameters on 
the camel objects
         URI httpUri = URISupport.createRemainingURI(new 
URI(UnsafeUriCharactersEncoder.encode(uri)), parameters);
         result.setHttpUri(httpUri);
@@ -184,8 +198,7 @@
     }
 
     /**
-     * Disconnects the URL specified on the endpoint from the specified
-     * processor.
+     * Disconnects the URL specified on the endpoint from the specified 
processor.
      */
     @Override
     public void disconnect(HttpConsumer consumer) throws Exception {
@@ -281,7 +294,39 @@
         sslSocketConnectors = connectors;
     }
 
-    public HttpClient getHttpClient() {
+    public synchronized HttpClient getHttpClient() {
+        if (httpClient == null) {
+            httpClient = new HttpClient();
+            httpClient.setConnectorType(HttpClient.CONNECTOR_SELECT_CHANNEL);
+
+            if (System.getProperty("http.proxyHost") != null && 
System.getProperty("http.proxyPort") != null) {
+                String host = System.getProperty("http.proxyHost");
+                int port = 
Integer.parseInt(System.getProperty("http.proxyPort"));
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Java System Property http.proxyHost and 
http.proxyPort detected. Using http proxy host: "
+                            + host + " port: " + port);
+                }
+                httpClient.setProxy(new Address(host, port));
+            }
+
+            // use QueueThreadPool as the default bounded is deprecated (see 
SMXCOMP-157)
+            if (getHttpClientThreadPool() == null) {
+                QueuedThreadPool qtp = new QueuedThreadPool();
+                if (httpClientMinThreads != null) {
+                    qtp.setMinThreads(httpClientMinThreads.intValue());
+                }
+                if (httpClientMaxThreads != null) {
+                    qtp.setMaxThreads(httpClientMaxThreads.intValue());
+                }
+                try {
+                    qtp.start();
+                } catch (Exception e) {
+                    throw new RuntimeCamelException("Error starting 
JettyHttpClient thread pool: " + qtp, e);
+                }
+                setHttpClientThreadPool(qtp);
+            }
+            httpClient.setThreadPool(getHttpClientThreadPool());
+        }
         return httpClient;
     }
 
@@ -289,6 +334,30 @@
         this.httpClient = httpClient;
     }
 
+    public ThreadPool getHttpClientThreadPool() {
+        return httpClientThreadPool;
+    }
+
+    public void setHttpClientThreadPool(ThreadPool httpClientThreadPool) {
+        this.httpClientThreadPool = httpClientThreadPool;
+    }
+
+    public Integer getHttpClientMinThreads() {
+        return httpClientMinThreads;
+    }
+
+    public void setHttpClientMinThreads(Integer httpClientMinThreads) {
+        this.httpClientMinThreads = httpClientMinThreads;
+    }
+
+    public Integer getHttpClientMaxThreads() {
+        return httpClientMaxThreads;
+    }
+
+    public void setHttpClientMaxThreads(Integer httpClientMaxThreads) {
+        this.httpClientMaxThreads = httpClientMaxThreads;
+    }
+
     // Implementation methods
     // 
-------------------------------------------------------------------------
     protected CamelServlet createServletForConnector(Server server, Connector 
connector, List<Handler> handlers) throws Exception {
@@ -320,4 +389,28 @@
         server.start();
         return server;
     }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+        if (httpClientThreadPool != null && httpClientThreadPool instanceof 
LifeCycle) {
+            LifeCycle lc = (LifeCycle) httpClientThreadPool;
+            lc.start();
+        }
+        if (httpClient != null && !httpClient.isStarted()) {
+            httpClient.start();
+        }
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        super.doStop();
+        if (httpClient != null) {
+            httpClient.stop();
+        }
+        if (httpClientThreadPool != null && httpClientThreadPool instanceof 
LifeCycle) {
+            LifeCycle lc = (LifeCycle) httpClientThreadPool;
+            lc.stop();
+        }
+    }
 }

Modified: 
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpEndpoint.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpEndpoint.java?rev=834008&r1=834007&r2=834008&view=diff
==============================================================================
--- 
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpEndpoint.java
 (original)
+++ 
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpEndpoint.java
 Mon Nov  9 10:16:08 2009
@@ -23,15 +23,9 @@
 import org.apache.camel.Consumer;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
-import org.apache.camel.component.http.HttpClientConfigurer;
 import org.apache.camel.component.http.HttpConsumer;
 import org.apache.camel.component.http.HttpEndpoint;
-import org.apache.commons.httpclient.HttpConnectionManager;
-import org.apache.commons.httpclient.params.HttpClientParams;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.mortbay.jetty.Handler;
-import org.mortbay.jetty.client.Address;
 import org.mortbay.jetty.client.HttpClient;
 
 /**
@@ -39,14 +33,11 @@
  */
 public class JettyHttpEndpoint extends HttpEndpoint {
 
-    private static final transient Log LOG = 
LogFactory.getLog(JettyHttpEndpoint.class);
     private boolean sessionSupport;
     private List<Handler> handlers;
-
-    public JettyHttpEndpoint(JettyHttpComponent component, String uri, URI 
httpURL, HttpClientParams clientParams,
-                             HttpConnectionManager httpConnectionManager, 
HttpClientConfigurer clientConfigurer) throws URISyntaxException {
-        super(uri, component, httpURL, clientParams, httpConnectionManager, 
clientConfigurer);
-    }
+    private HttpClient client;
+    private boolean synchronous = true;
+    private int concurrentConsumers = 1;
 
     public JettyHttpEndpoint(JettyHttpComponent component, String uri, URI 
httpURL) throws URISyntaxException {
         super(uri, component, httpURL);
@@ -59,7 +50,7 @@
 
     @Override
     public Producer createProducer() throws Exception {
-        return new JettyHttpProducer(this);
+        return new JettyHttpProducer(this, getClient());
     }
 
     @Override
@@ -83,30 +74,31 @@
         this.handlers = handlers;
     }
 
-    /**
-     * Factory method used by producers and consumers to create a new 
{...@link org.apache.commons.httpclient.HttpClient} instance
-     */
-    public HttpClient getJettyHttpClient() throws Exception {
-        HttpClient answer = getComponent().getHttpClient();
-        if (answer == null) {
-            answer = new HttpClient();
-            answer.setConnectorType(HttpClient.CONNECTOR_SELECT_CHANNEL);
-
-            if (System.getProperty("http.proxyHost") != null && 
System.getProperty("http.proxyPort") != null) {
-                String host = System.getProperty("http.proxyHost");
-                int port = 
Integer.parseInt(System.getProperty("http.proxyPort"));
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Java System Property http.proxyHost and 
http.proxyPort detected. Using http proxy host: "
-                            + host + " port: " + port);
-                }
-                answer.setProxy(new Address(host, port));
-            }
+    public HttpClient getClient() {
+        if (client == null) {
+            return getComponent().getHttpClient();
+        }
+        return client;
+    }
 
-            // TODO: allow jetty producer configuration from uri
+    public void setClient(HttpClient client) {
+        this.client = client;
+    }
 
-            answer.start();
-        }
-        return answer;
+    public boolean isSynchronous() {
+        return synchronous;
+    }
+
+    public void setSynchronous(boolean synchronous) {
+        this.synchronous = synchronous;
+    }
+
+    public int getConcurrentConsumers() {
+        return concurrentConsumers;
+    }
+
+    public void setConcurrentConsumers(int concurrentConsumers) {
+        this.concurrentConsumers = concurrentConsumers;
     }
     
 }

Modified: 
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpProducer.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpProducer.java?rev=834008&r1=834007&r2=834008&view=diff
==============================================================================
--- 
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpProducer.java
 (original)
+++ 
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpProducer.java
 Mon Nov  9 10:16:08 2009
@@ -19,14 +19,21 @@
 import java.io.IOException;
 import java.net.URISyntaxException;
 import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
 import org.apache.camel.component.http.HttpMethods;
 import org.apache.camel.component.http.helper.HttpProducerHelper;
 import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.URISupport;
+import org.apache.camel.util.concurrent.ExecutorServiceHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.mortbay.jetty.client.HttpClient;
@@ -34,12 +41,18 @@
 /**
  * @version $Revision$
  */
-public class JettyHttpProducer extends DefaultProducer {
+public class JettyHttpProducer extends DefaultProducer implements Runnable {
     private static final transient Log LOG = 
LogFactory.getLog(JettyHttpProducer.class);
-    private boolean throwException;
+    private final BlockingQueue<Exchange> completeTasks = new 
LinkedBlockingQueue<Exchange>();
+    private ExecutorService executor;
+    private final HttpClient client;
 
-    public JettyHttpProducer(Endpoint endpoint) {
+    // TODO: support that bridge option
+    // TODO: more unit tests
+
+    public JettyHttpProducer(Endpoint endpoint, HttpClient client) {
         super(endpoint);
+        this.client = client;
     }
 
     @Override
@@ -48,22 +61,47 @@
     }
 
     public void process(Exchange exchange) throws Exception {
-        HttpClient client = getEndpoint().getJettyHttpClient();
+        HttpClient client = getEndpoint().getClient();
 
         JettyContentExchange httpExchange = createHttpExchange(exchange);
 
+        if (getEndpoint().isSynchronous()) {
+            sendSynchronous(exchange, client, httpExchange);
+        } else {
+            sendAsynchronous(exchange, client, httpExchange);
+        }
+    }
+
+    protected void sendAsynchronous(final Exchange exchange, final HttpClient 
client, final JettyContentExchange httpExchange) throws IOException {
+        // use a new copy of the exchange to route async and handover the on 
completion to the new copy
+        // so its the new copy that performs the on completion callback when 
its done
+        final Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, 
true);
+        // the copy must be an in ouy MEP
+        copy.setPattern(ExchangePattern.InOut);
+
+        // configure http exchange to signal when its complete
+        httpExchange.setCompleteTasks(completeTasks);
+        httpExchange.setExchange(copy);
+
         // set the body with the message holder
-        exchange.setOut(new JettyHttpMessage(exchange, httpExchange, 
getEndpoint().isThrowExceptionOnFailure()));
+        copy.setOut(new JettyHttpMessage(exchange, httpExchange, 
getEndpoint().isThrowExceptionOnFailure()));
 
-        // will send it async
-        sendExchange(client, httpExchange);
+        doSendExchange(client, httpExchange);
 
-        // TODO: configuration of http client as getter/setters and URI on 
component
-        // TODO: support that bridge option
-        // TODO: more unit tests
+        // now we need to let the original exchange to stop
+        // and let that copy exchange continue
+        // TODO: Use something that marks it as async routed
+        exchange.setProperty(Exchange.ROUTE_STOP, Boolean.TRUE);
     }
 
-    protected void sendExchange(HttpClient client, JettyContentExchange 
httpExchange) throws IOException {
+    protected void sendSynchronous(Exchange exchange, HttpClient client, 
JettyContentExchange httpExchange) throws IOException {
+        // set the body with the message holder
+        exchange.setOut(new JettyHttpMessage(exchange, httpExchange, 
getEndpoint().isThrowExceptionOnFailure()));
+
+        doSendExchange(client, httpExchange);
+    }
+
+    protected void doSendExchange(HttpClient client, JettyContentExchange 
httpExchange) throws IOException {
         if (LOG.isDebugEnabled()) {
             LOG.debug("Sending HTTP request to: " + httpExchange.getUrl());
         }
@@ -103,4 +141,56 @@
         }
     }
 
+    public void run() {
+        while (isRunAllowed()) {
+            Exchange exchange;
+            try {
+                // TODO: Wonder if we can use take instead of poll with 
timeout?
+                exchange = completeTasks.poll(1000, TimeUnit.MILLISECONDS);
+            } catch (InterruptedException e) {
+                LOG.debug("Sleep interrupted, are we stopping? " + 
(isStopping() || isStopped()));
+                continue;
+            }
+
+            if (exchange != null) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Continue to route Exchange: " + exchange);
+                }
+
+                // TODO: hook into exiting route path
+                
exchange.getContext().createProducerTemplate().send("mock:result", exchange);
+            }
+        }
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+
+        client.start();
+
+        // this is only needed if we are asynchronous where we need to have a 
thread pool of listeners
+        // that will process the completed tasks
+        if (!getEndpoint().isSynchronous()) {
+            int poolSize = getEndpoint().getConcurrentConsumers();
+            executor = ExecutorServiceHelper.newFixedThreadPool(poolSize, 
getEndpoint().getEndpointUri(), true);
+            for (int i = 0; i < poolSize; i++) {
+                executor.execute(this);
+            }
+        }
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        super.doStop();
+
+        client.stop();
+
+        if (executor != null) {
+            executor.shutdownNow();
+            executor = null;
+        }
+        completeTasks.clear();
+    }
+
 }

Added: 
camel/trunk/components/camel-jetty/src/main/resources/META-INF/services/org/apache/camel/TypeConverter
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/main/resources/META-INF/services/org/apache/camel/TypeConverter?rev=834008&view=auto
==============================================================================
--- 
camel/trunk/components/camel-jetty/src/main/resources/META-INF/services/org/apache/camel/TypeConverter
 (added)
+++ 
camel/trunk/components/camel-jetty/src/main/resources/META-INF/services/org/apache/camel/TypeConverter
 Mon Nov  9 10:16:08 2009
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.camel.component.jetty
\ No newline at end of file

Copied: 
camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerAsynchronousTest.java
 (from r833983, 
camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerSlowResponseTest.java)
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerAsynchronousTest.java?p2=camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerAsynchronousTest.java&p1=camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerSlowResponseTest.java&r1=833983&r2=834008&rev=834008&view=diff
==============================================================================
--- 
camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerSlowResponseTest.java
 (original)
+++ 
camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerAsynchronousTest.java
 Mon Nov  9 10:16:08 2009
@@ -16,35 +16,40 @@
  */
 package org.apache.camel.component.jetty.jettyproducer;
 
-import java.util.concurrent.Future;
 import javax.servlet.http.HttpServletResponse;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.test.junit4.CamelTestSupport;
 import org.junit.Test;
 
 /**
  * @version $Revision$
  */
-public class JettyHttpProducerSlowResponseTest extends CamelTestSupport {
+public class JettyHttpProducerAsynchronousTest extends CamelTestSupport {
 
-    private String url = "jetty://http://0.0.0.0:9123/foo";;
+    private static String thread1;
+    private static String thread2;
+
+    private String url = 
"jetty://http://0.0.0.0:9123/foo?synchronous=false&concurrentConsumers=5";;
 
     @Test
-    public void testSlowReply() throws Exception {
-        Exchange exchange = template.request(url, null);
-        assertNotNull(exchange);
-
-        Future<String> future = exchange.getOut().getBody(Future.class);
-        assertNotNull(future);
-        assertEquals(false, future.isDone());
+    public void testAsynchronous() throws Exception {
+        thread1 = "";
+        thread2 = "";
+
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(1);
+        mock.message(0).outBody().isEqualTo("Bye World");
 
-        String reply = future.get();
-        assertEquals("Bye World", reply);
+        Object body = null;
+        template.sendBody("direct:start", body);
 
-        assertEquals(3, exchange.getOut().getHeaders().size());
+        assertMockEndpointsSatisfied();
+
+        assertNotSame("Should not use same threads", thread1, thread2);
     }
 
     @Override
@@ -52,6 +57,16 @@
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
+                from("direct:start").process(new Processor() {
+                    public void process(Exchange exchange) throws Exception {
+                        thread1 = Thread.currentThread().getName();
+                    }
+                }).to(url).process(new Processor() {
+                    public void process(Exchange exchange) throws Exception {
+                        thread2 = Thread.currentThread().getName();
+                    }
+                }).to("mock:result");
+
                 from(url).process(new Processor() {
                     public void process(Exchange exchange) throws Exception {
                         HttpServletResponse res = 
exchange.getIn().getBody(HttpServletResponse.class);

Copied: 
camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerGoogleAsynchronousTest.java
 (from r833429, 
camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerGoogleTest.java)
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerGoogleAsynchronousTest.java?p2=camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerGoogleAsynchronousTest.java&p1=camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerGoogleTest.java&r1=833429&r2=834008&rev=834008&view=diff
==============================================================================
--- 
camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerGoogleTest.java
 (original)
+++ 
camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerGoogleAsynchronousTest.java
 Mon Nov  9 10:16:08 2009
@@ -16,29 +16,26 @@
  */
 package org.apache.camel.component.jetty.jettyproducer;
 
-import java.util.concurrent.Future;
-
 import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.test.junit4.CamelTestSupport;
 import org.junit.Test;
 
 /**
  * @version $Revision$
  */
-public class JettyHttpProducerGoogleTest extends CamelTestSupport {
+public class JettyHttpProducerGoogleAsynchronousTest extends CamelTestSupport {
 
     @Test
-    public void testGoogleFrontPage() throws Exception {
-        String reply = template.requestBody("direct:start", null, 
String.class);
-        assertNotNull(reply);
-    }
+    public void testGoogleFrontPageAsync() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(1);
+        mock.message(0).outBody(String.class).contains("google");
 
-    @Test
-    public void testGoogleFrontPageFutureTask() throws Exception {
-        Object body = null;
-        Future<String> reply = (Future<String>) 
template.requestBody("direct:start", body);
-        assertNotNull(reply);
-        assertNotNull(reply.get());
+        template.sendBody("direct:start", null);
+        System.out.println("I am not blocked");
+
+        assertMockEndpointsSatisfied();
     }
 
     @Override
@@ -46,10 +43,11 @@
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                // to prevent redirect being thrown as an exception
-                
from("direct:start").to("jetty://http://www.google.com?throwExceptionOnFailure=false";);
+                from("direct:start")
+                    // to prevent redirect being thrown as an exception
+                    
.to("jetty://http://www.google.com?throwExceptionOnFailure=false&synchronous=false&concurrentConsumers=5";)
+                    .to("mock:result");
             }
         };
     }
-}
-
+}
\ No newline at end of file

Modified: 
camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerGoogleTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerGoogleTest.java?rev=834008&r1=834007&r2=834008&view=diff
==============================================================================
--- 
camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerGoogleTest.java
 (original)
+++ 
camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerGoogleTest.java
 Mon Nov  9 10:16:08 2009
@@ -47,7 +47,7 @@
             @Override
             public void configure() throws Exception {
                 // to prevent redirect being thrown as an exception
-                
from("direct:start").to("jetty://http://www.google.com?throwExceptionOnFailure=false";);
+                
from("direct:start").to("jetty://http://www.google.com?throwExceptionOnFailure=false&synchronous=true";);
             }
         };
     }

Modified: 
camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerHeaderBasedCBRTestTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerHeaderBasedCBRTestTest.java?rev=834008&r1=834007&r2=834008&view=diff
==============================================================================
--- 
camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerHeaderBasedCBRTestTest.java
 (original)
+++ 
camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerHeaderBasedCBRTestTest.java
 Mon Nov  9 10:16:08 2009
@@ -31,8 +31,8 @@
  */
 public class JettyHttpProducerHeaderBasedCBRTestTest extends CamelTestSupport {
 
-    private String url = "jetty://http://0.0.0.0:9123/foo";;
     private static String step;
+    private String url = "jetty://http://0.0.0.0:9123/foo";;
 
     @Test
     @SuppressWarnings("unchecked")
@@ -49,6 +49,7 @@
         });
 
         template.sendBody("direct:start", "Hello World");
+        step += "D";
 
         assertMockEndpointsSatisfied();
 
@@ -58,7 +59,7 @@
         assertEquals("Bye World", future.get());
 
         // and ensure the we could CBR on the header before we got the reply 
body
-        assertEquals("ACB", step);
+        assertTrue("Should be either ACDB or ADCB was " + step, 
step.equals("ACDB") || step.equals("ADCB"));
     }
 
     @Override


Reply via email to