Author: davsclaus
Date: Wed Nov 11 12:12:55 2009
New Revision: 834846

URL: http://svn.apache.org/viewvc?rev=834846&view=rev
Log:
CAMEL-2135: Added timeout handling and other kind of failures.

Added:
    
camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerAsyncTimeoutTest.java
   (with props)
    
camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerTimeoutTest.java
      - copied, changed from r834779, 
camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerSlowResponseTest.java
Modified:
    
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyContentExchange.java
    
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpProducer.java

Modified: 
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyContentExchange.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyContentExchange.java?rev=834846&r1=834845&r2=834846&view=diff
==============================================================================
--- 
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyContentExchange.java
 (original)
+++ 
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyContentExchange.java
 Wed Nov 11 12:12:55 2009
@@ -20,16 +20,18 @@
 import java.io.UnsupportedEncodingException;
 import java.util.LinkedHashMap;
 import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.AsyncCallback;
+import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExchangeTimedOutException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.mortbay.io.Buffer;
 import org.mortbay.jetty.HttpHeaders;
 import org.mortbay.jetty.client.ContentExchange;
+import org.mortbay.jetty.client.HttpClient;
+import org.mortbay.jetty.client.HttpExchange;
 
 /**
  * Jetty specific exchange which keeps track of the the request and response.
@@ -41,19 +43,16 @@
     private static final transient Log LOG = 
LogFactory.getLog(JettyContentExchange.class);
 
     private final Map<String, Object> headers = new LinkedHashMap<String, 
Object>();
-    private CountDownLatch headersComplete = new CountDownLatch(1);
-    private CountDownLatch bodyComplete = new CountDownLatch(1);
-    private volatile boolean failed;
     private volatile Exchange exchange;
     private volatile AsyncCallback callback;
+    private volatile JettyHttpBinding jettyBinding;
+    private volatile HttpClient client;
 
-    public JettyContentExchange() {
-        // keep headers by default
-        super(true);
-    }
-
-    public void setExchange(Exchange exchange) {
+    public JettyContentExchange(Exchange exchange, JettyHttpBinding 
jettyBinding, HttpClient client) {
+        super(true); // keep headers by default
         this.exchange = exchange;
+        this.jettyBinding = jettyBinding;
+        this.client = client;
     }
 
     public void setCallback(AsyncCallback callback) {
@@ -67,38 +66,23 @@
     }
 
     @Override
-    protected void onResponseHeaderComplete() throws IOException {
-        headersComplete.countDown();
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("onResponseHeader for " + getUrl());
-        }
+    protected void onResponseComplete() {
+        doTaskCompleted();
     }
 
     @Override
-    protected void onResponseComplete() throws IOException {
-        bodyComplete.countDown();
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("onResponseComplete for " + getUrl());
-        }
-
-        if (callback != null && exchange != null) {
-            // signal we are complete
-            callback.onTaskCompleted(exchange);
-        }
+    protected void onExpire() {
+        doTaskCompleted();
     }
 
     @Override
-    protected void onResponseStatus(Buffer version, int status, Buffer reason) 
throws IOException {
-        super.onResponseStatus(version, status, reason);
-        failed = status != 200;
-    }
-
-    public boolean isHeadersComplete() {
-        return headersComplete.getCount() == 0;
+    protected void onException(Throwable ex) {
+        doTaskCompleted(ex);
     }
 
-    public boolean isBodyComplete() {
-        return bodyComplete.getCount() == 0;
+    @Override
+    protected void onConnectionFailed(Throwable ex) {
+        doTaskCompleted(ex);
     }
 
     public Map<String, Object> getHeaders() {
@@ -109,33 +93,48 @@
         return super.getResponseContent();
     }
 
-    public void waitForHeadersToComplete() throws InterruptedException {
-        if (LOG.isTraceEnabled()) {
-            LOG.trace("Waiting for headers to complete for " + getUrl());
-        }
-        headersComplete.await();
+    public String getUrl() {
+        String params = 
getRequestFields().getStringField(HttpHeaders.CONTENT_ENCODING);
+        return getScheme() + "//" + getAddress().toString() + getURI() + 
(params != null ? "?" + params : "");
     }
 
-    public void waitForBodyToComplete() throws InterruptedException {
-        if (LOG.isTraceEnabled()) {
-            LOG.trace("Waiting for body to complete for " + getUrl());
+    protected void doTaskCompleted() {
+        if (callback == null) {
+            // this is only for the async callback
+            return;
         }
-        bodyComplete.await();
-    }
 
-    public boolean waitForBodyToComplete(long timeout, TimeUnit timeUnit) 
throws InterruptedException {
-        if (LOG.isTraceEnabled()) {
-            LOG.trace("Waiting for body to complete for " + getUrl());
+        int exchangeState = getStatus();
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("TaskComplete with state " + exchangeState + " for url: 
" + getUrl());
         }
-        return bodyComplete.await(timeout, timeUnit);
-    }
 
-    public boolean isFailed() {
-        return failed;
+        try {
+            if (exchangeState == HttpExchange.STATUS_COMPLETED) {
+                // process the response as the state is ok
+                try {
+                    jettyBinding.populateResponse(exchange, this);
+                } catch (Exception e) {
+                    exchange.setException(e);
+                }
+            } else if (exchangeState == HttpExchange.STATUS_EXPIRED) {
+                // we did timeout
+                exchange.setException(new ExchangeTimedOutException(exchange, 
client.getTimeout()));
+            } else if (exchangeState == HttpExchange.STATUS_EXCEPTED) {
+                // some kind of other error
+                exchange.setException(new CamelExchangeException("JettyClient 
failed with state " + exchangeState, exchange));
+            }
+        } finally {
+            // now invoke callback
+            callback.onTaskCompleted(exchange);
+        }
     }
 
-    public String getUrl() {
-        String params = 
getRequestFields().getStringField(HttpHeaders.CONTENT_ENCODING);
-        return getScheme() + "//" + getAddress().toString() + getURI() + 
(params != null ? "?" + params : "");
+    protected void doTaskCompleted(Throwable ex) {
+        // some kind of other error
+        exchange.setException(new CamelExchangeException("JettyClient failed 
cause by: " + ex.getMessage(), exchange, ex));
+        callback.onTaskCompleted(exchange);
     }
+
 }

Modified: 
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpProducer.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpProducer.java?rev=834846&r1=834845&r2=834846&view=diff
==============================================================================
--- 
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpProducer.java
 (original)
+++ 
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpProducer.java
 Wed Nov 11 12:12:55 2009
@@ -22,8 +22,10 @@
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
+import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExchangeTimedOutException;
 import org.apache.camel.Message;
 import org.apache.camel.component.http.HttpMethods;
 import org.apache.camel.component.http.helper.HttpProducerHelper;
@@ -34,6 +36,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.mortbay.jetty.client.HttpClient;
+import org.mortbay.jetty.client.HttpExchange;
 
 /**
  * @version $Revision$
@@ -63,47 +66,40 @@
     public void process(Exchange exchange, final AsyncCallback callback) 
throws Exception {
         HttpClient client = getEndpoint().getClient();
 
-        final JettyContentExchange httpExchange = createHttpExchange(exchange);
-
-        // wrap the original callback into another so we can populate the 
response
-        // before we signal completion to the original callback which then 
will start routing the exchange
-        AsyncCallback wrapped = new AsyncCallback() {
-            public void onTaskCompleted(Exchange exchange) {
-                // at first we must populate the response
-                try {
-                    getBinding().populateResponse(exchange, httpExchange);
-                } catch (JettyHttpOperationFailedException e) {
-                    // can be expected
-                    exchange.setException(e);
-                } catch (Exception e) {
-                    LOG.error("Error populating response from " + 
httpExchange.getUrl() + " on Exchange " + exchange, e);
-                    exchange.setException(e);
-                } finally {
-                    // now we are ready so signal completion to the original 
callback
-                    callback.onTaskCompleted(exchange);
-                }
-            }
-        };
-
-        sendAsynchronous(exchange, client, httpExchange, wrapped);
+        JettyContentExchange httpExchange = createHttpExchange(exchange);
+        sendAsynchronous(exchange, client, httpExchange, callback);
     }
 
     protected void sendAsynchronous(final Exchange exchange, final HttpClient 
client, final JettyContentExchange httpExchange,
                                     final AsyncCallback callback) throws 
IOException {
 
+        // set the callback for the async mode
         httpExchange.setCallback(callback);
-        httpExchange.setExchange(exchange);
 
         doSendExchange(client, httpExchange);
+
+        // the callback will handle all the response handling logic
     }
 
     protected void sendSynchronous(Exchange exchange, HttpClient client, 
JettyContentExchange httpExchange) throws Exception {
         doSendExchange(client, httpExchange);
 
         // we send synchronous so wait for it to be done
-        httpExchange.waitForDone();
-        // and then process the response
-        getBinding().populateResponse(exchange, httpExchange);
+        int exchangeState = httpExchange.waitForDone();
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("HTTP exchange is done with state " + exchangeState);
+        }
+
+        if (exchangeState == HttpExchange.STATUS_COMPLETED) {
+            // process the response as the state is ok
+            getBinding().populateResponse(exchange, httpExchange);
+        } else if (exchangeState == HttpExchange.STATUS_EXPIRED) {
+            // we did timeout
+            throw new ExchangeTimedOutException(exchange, client.getTimeout());
+        } else if (exchangeState == HttpExchange.STATUS_EXCEPTED) {
+            // some kind of other error
+            throw new CamelExchangeException("JettyClient failed with state " 
+ exchangeState, exchange);
+        }
     }
 
     protected JettyContentExchange createHttpExchange(Exchange exchange) 
throws Exception {
@@ -111,7 +107,7 @@
         HttpMethods methodToUse = HttpProducerHelper.createMethod(exchange, 
getEndpoint(), exchange.getIn().getBody() != null);
         String method = methodToUse.createMethod(url).getName();
 
-        JettyContentExchange httpExchange = new JettyContentExchange();
+        JettyContentExchange httpExchange = new JettyContentExchange(exchange, 
getBinding(), client);
         httpExchange.setMethod(method);
         httpExchange.setURL(url);
 

Added: 
camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerAsyncTimeoutTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerAsyncTimeoutTest.java?rev=834846&view=auto
==============================================================================
--- 
camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerAsyncTimeoutTest.java
 (added)
+++ 
camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerAsyncTimeoutTest.java
 Wed Nov 11 12:12:55 2009
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jetty.jettyproducer;
+
+import org.apache.camel.ExchangeTimedOutException;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * @version $Revision$
+ */
+public class JettyHttpProducerAsyncTimeoutTest extends CamelTestSupport {
+
+    private String url = 
"jetty://http://0.0.0.0:9123/timeout?httpClient.timeout=2000";;
+
+    @Test
+    public void testTimeout() throws Exception {
+        getMockEndpoint("mock:result").expectedMessageCount(0);
+        getMockEndpoint("mock:error").expectedMessageCount(0);
+        getMockEndpoint("mock:timeout").expectedMessageCount(1);
+
+        template.sendBody("direct:start", null);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                onException(Exception.class).handled(true).to("mock:error");
+                
onException(ExchangeTimedOutException.class).handled(true).to("mock:timeout");
+
+                from("direct:start").toAsync(url).to("mock:result");
+
+                from(url).delay(5000).transform(constant("Bye World"));
+            }
+        };
+    }
+}
\ No newline at end of file

Propchange: 
camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerAsyncTimeoutTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerAsyncTimeoutTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Copied: 
camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerTimeoutTest.java
 (from r834779, 
camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerSlowResponseTest.java)
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerTimeoutTest.java?p2=camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerTimeoutTest.java&p1=camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerSlowResponseTest.java&r1=834779&r2=834846&rev=834846&view=diff
==============================================================================
--- 
camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerSlowResponseTest.java
 (original)
+++ 
camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerTimeoutTest.java
 Wed Nov 11 12:12:55 2009
@@ -16,10 +16,7 @@
  */
 package org.apache.camel.component.jetty.jettyproducer;
 
-import javax.servlet.http.HttpServletResponse;
-
-import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
+import org.apache.camel.ExchangeTimedOutException;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.test.junit4.CamelTestSupport;
 import org.junit.Test;
@@ -27,19 +24,19 @@
 /**
  * @version $Revision$
  */
-public class JettyHttpProducerSlowResponseTest extends CamelTestSupport {
+public class JettyHttpProducerTimeoutTest extends CamelTestSupport {
 
-    private String url = "jetty://http://0.0.0.0:9123/foo";;
+    private String url = 
"jetty://http://0.0.0.0:9123/timeout?httpClient.timeout=2000";;
 
     @Test
-    public void testSlowReply() throws Exception {
-        Exchange exchange = template.request(url, null);
-        assertNotNull(exchange);
-
-        String reply = exchange.getOut().getBody(String.class);
-        assertEquals("Bye World", reply);
-
-        assertEquals(4, exchange.getOut().getHeaders().size());
+    public void testTimeout() throws Exception {
+        try {
+            template.request(url, null);
+            fail("Should have thrown a timeout exception");
+        } catch (Exception e) {
+            ExchangeTimedOutException cause = 
assertIsInstanceOf(ExchangeTimedOutException.class, e.getCause());
+            assertEquals(2000, cause.getTimeout());
+        }
     }
 
     @Override
@@ -47,22 +44,7 @@
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from(url).process(new Processor() {
-                    public void process(Exchange exchange) throws Exception {
-                        HttpServletResponse res = 
exchange.getIn().getBody(HttpServletResponse.class);
-                        res.setStatus(200);
-                        res.setHeader("customer", "gold");
-
-                        // write empty string to force flushing
-                        res.getWriter().write("");
-                        res.flushBuffer();
-
-                        Thread.sleep(1000);
-
-                        res.getWriter().write("Bye World");
-                        res.flushBuffer();
-                    }
-                });
+                from(url).delay(5000).transform(constant("Bye World"));
             }
         };
     }


Reply via email to