Author: davsclaus Date: Wed Nov 11 12:12:55 2009 New Revision: 834846 URL: http://svn.apache.org/viewvc?rev=834846&view=rev Log: CAMEL-2135: Added timeout handling and other kind of failures.
Added: camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerAsyncTimeoutTest.java (with props) camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerTimeoutTest.java - copied, changed from r834779, camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerSlowResponseTest.java Modified: camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyContentExchange.java camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpProducer.java Modified: camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyContentExchange.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyContentExchange.java?rev=834846&r1=834845&r2=834846&view=diff ============================================================================== --- camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyContentExchange.java (original) +++ camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyContentExchange.java Wed Nov 11 12:12:55 2009 @@ -20,16 +20,18 @@ import java.io.UnsupportedEncodingException; import java.util.LinkedHashMap; import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import org.apache.camel.AsyncCallback; +import org.apache.camel.CamelExchangeException; import org.apache.camel.Exchange; +import org.apache.camel.ExchangeTimedOutException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.mortbay.io.Buffer; import org.mortbay.jetty.HttpHeaders; import org.mortbay.jetty.client.ContentExchange; +import org.mortbay.jetty.client.HttpClient; +import org.mortbay.jetty.client.HttpExchange; /** * Jetty specific exchange which keeps track of the the request and response. @@ -41,19 +43,16 @@ private static final transient Log LOG = LogFactory.getLog(JettyContentExchange.class); private final Map<String, Object> headers = new LinkedHashMap<String, Object>(); - private CountDownLatch headersComplete = new CountDownLatch(1); - private CountDownLatch bodyComplete = new CountDownLatch(1); - private volatile boolean failed; private volatile Exchange exchange; private volatile AsyncCallback callback; + private volatile JettyHttpBinding jettyBinding; + private volatile HttpClient client; - public JettyContentExchange() { - // keep headers by default - super(true); - } - - public void setExchange(Exchange exchange) { + public JettyContentExchange(Exchange exchange, JettyHttpBinding jettyBinding, HttpClient client) { + super(true); // keep headers by default this.exchange = exchange; + this.jettyBinding = jettyBinding; + this.client = client; } public void setCallback(AsyncCallback callback) { @@ -67,38 +66,23 @@ } @Override - protected void onResponseHeaderComplete() throws IOException { - headersComplete.countDown(); - if (LOG.isDebugEnabled()) { - LOG.debug("onResponseHeader for " + getUrl()); - } + protected void onResponseComplete() { + doTaskCompleted(); } @Override - protected void onResponseComplete() throws IOException { - bodyComplete.countDown(); - if (LOG.isDebugEnabled()) { - LOG.debug("onResponseComplete for " + getUrl()); - } - - if (callback != null && exchange != null) { - // signal we are complete - callback.onTaskCompleted(exchange); - } + protected void onExpire() { + doTaskCompleted(); } @Override - protected void onResponseStatus(Buffer version, int status, Buffer reason) throws IOException { - super.onResponseStatus(version, status, reason); - failed = status != 200; - } - - public boolean isHeadersComplete() { - return headersComplete.getCount() == 0; + protected void onException(Throwable ex) { + doTaskCompleted(ex); } - public boolean isBodyComplete() { - return bodyComplete.getCount() == 0; + @Override + protected void onConnectionFailed(Throwable ex) { + doTaskCompleted(ex); } public Map<String, Object> getHeaders() { @@ -109,33 +93,48 @@ return super.getResponseContent(); } - public void waitForHeadersToComplete() throws InterruptedException { - if (LOG.isTraceEnabled()) { - LOG.trace("Waiting for headers to complete for " + getUrl()); - } - headersComplete.await(); + public String getUrl() { + String params = getRequestFields().getStringField(HttpHeaders.CONTENT_ENCODING); + return getScheme() + "//" + getAddress().toString() + getURI() + (params != null ? "?" + params : ""); } - public void waitForBodyToComplete() throws InterruptedException { - if (LOG.isTraceEnabled()) { - LOG.trace("Waiting for body to complete for " + getUrl()); + protected void doTaskCompleted() { + if (callback == null) { + // this is only for the async callback + return; } - bodyComplete.await(); - } - public boolean waitForBodyToComplete(long timeout, TimeUnit timeUnit) throws InterruptedException { - if (LOG.isTraceEnabled()) { - LOG.trace("Waiting for body to complete for " + getUrl()); + int exchangeState = getStatus(); + + if (LOG.isDebugEnabled()) { + LOG.debug("TaskComplete with state " + exchangeState + " for url: " + getUrl()); } - return bodyComplete.await(timeout, timeUnit); - } - public boolean isFailed() { - return failed; + try { + if (exchangeState == HttpExchange.STATUS_COMPLETED) { + // process the response as the state is ok + try { + jettyBinding.populateResponse(exchange, this); + } catch (Exception e) { + exchange.setException(e); + } + } else if (exchangeState == HttpExchange.STATUS_EXPIRED) { + // we did timeout + exchange.setException(new ExchangeTimedOutException(exchange, client.getTimeout())); + } else if (exchangeState == HttpExchange.STATUS_EXCEPTED) { + // some kind of other error + exchange.setException(new CamelExchangeException("JettyClient failed with state " + exchangeState, exchange)); + } + } finally { + // now invoke callback + callback.onTaskCompleted(exchange); + } } - public String getUrl() { - String params = getRequestFields().getStringField(HttpHeaders.CONTENT_ENCODING); - return getScheme() + "//" + getAddress().toString() + getURI() + (params != null ? "?" + params : ""); + protected void doTaskCompleted(Throwable ex) { + // some kind of other error + exchange.setException(new CamelExchangeException("JettyClient failed cause by: " + ex.getMessage(), exchange, ex)); + callback.onTaskCompleted(exchange); } + } Modified: camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpProducer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpProducer.java?rev=834846&r1=834845&r2=834846&view=diff ============================================================================== --- camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpProducer.java (original) +++ camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpProducer.java Wed Nov 11 12:12:55 2009 @@ -22,8 +22,10 @@ import org.apache.camel.AsyncCallback; import org.apache.camel.AsyncProcessor; +import org.apache.camel.CamelExchangeException; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; +import org.apache.camel.ExchangeTimedOutException; import org.apache.camel.Message; import org.apache.camel.component.http.HttpMethods; import org.apache.camel.component.http.helper.HttpProducerHelper; @@ -34,6 +36,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.mortbay.jetty.client.HttpClient; +import org.mortbay.jetty.client.HttpExchange; /** * @version $Revision$ @@ -63,47 +66,40 @@ public void process(Exchange exchange, final AsyncCallback callback) throws Exception { HttpClient client = getEndpoint().getClient(); - final JettyContentExchange httpExchange = createHttpExchange(exchange); - - // wrap the original callback into another so we can populate the response - // before we signal completion to the original callback which then will start routing the exchange - AsyncCallback wrapped = new AsyncCallback() { - public void onTaskCompleted(Exchange exchange) { - // at first we must populate the response - try { - getBinding().populateResponse(exchange, httpExchange); - } catch (JettyHttpOperationFailedException e) { - // can be expected - exchange.setException(e); - } catch (Exception e) { - LOG.error("Error populating response from " + httpExchange.getUrl() + " on Exchange " + exchange, e); - exchange.setException(e); - } finally { - // now we are ready so signal completion to the original callback - callback.onTaskCompleted(exchange); - } - } - }; - - sendAsynchronous(exchange, client, httpExchange, wrapped); + JettyContentExchange httpExchange = createHttpExchange(exchange); + sendAsynchronous(exchange, client, httpExchange, callback); } protected void sendAsynchronous(final Exchange exchange, final HttpClient client, final JettyContentExchange httpExchange, final AsyncCallback callback) throws IOException { + // set the callback for the async mode httpExchange.setCallback(callback); - httpExchange.setExchange(exchange); doSendExchange(client, httpExchange); + + // the callback will handle all the response handling logic } protected void sendSynchronous(Exchange exchange, HttpClient client, JettyContentExchange httpExchange) throws Exception { doSendExchange(client, httpExchange); // we send synchronous so wait for it to be done - httpExchange.waitForDone(); - // and then process the response - getBinding().populateResponse(exchange, httpExchange); + int exchangeState = httpExchange.waitForDone(); + if (LOG.isTraceEnabled()) { + LOG.trace("HTTP exchange is done with state " + exchangeState); + } + + if (exchangeState == HttpExchange.STATUS_COMPLETED) { + // process the response as the state is ok + getBinding().populateResponse(exchange, httpExchange); + } else if (exchangeState == HttpExchange.STATUS_EXPIRED) { + // we did timeout + throw new ExchangeTimedOutException(exchange, client.getTimeout()); + } else if (exchangeState == HttpExchange.STATUS_EXCEPTED) { + // some kind of other error + throw new CamelExchangeException("JettyClient failed with state " + exchangeState, exchange); + } } protected JettyContentExchange createHttpExchange(Exchange exchange) throws Exception { @@ -111,7 +107,7 @@ HttpMethods methodToUse = HttpProducerHelper.createMethod(exchange, getEndpoint(), exchange.getIn().getBody() != null); String method = methodToUse.createMethod(url).getName(); - JettyContentExchange httpExchange = new JettyContentExchange(); + JettyContentExchange httpExchange = new JettyContentExchange(exchange, getBinding(), client); httpExchange.setMethod(method); httpExchange.setURL(url); Added: camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerAsyncTimeoutTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerAsyncTimeoutTest.java?rev=834846&view=auto ============================================================================== --- camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerAsyncTimeoutTest.java (added) +++ camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerAsyncTimeoutTest.java Wed Nov 11 12:12:55 2009 @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.jetty.jettyproducer; + +import org.apache.camel.ExchangeTimedOutException; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +/** + * @version $Revision$ + */ +public class JettyHttpProducerAsyncTimeoutTest extends CamelTestSupport { + + private String url = "jetty://http://0.0.0.0:9123/timeout?httpClient.timeout=2000"; + + @Test + public void testTimeout() throws Exception { + getMockEndpoint("mock:result").expectedMessageCount(0); + getMockEndpoint("mock:error").expectedMessageCount(0); + getMockEndpoint("mock:timeout").expectedMessageCount(1); + + template.sendBody("direct:start", null); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + onException(Exception.class).handled(true).to("mock:error"); + onException(ExchangeTimedOutException.class).handled(true).to("mock:timeout"); + + from("direct:start").toAsync(url).to("mock:result"); + + from(url).delay(5000).transform(constant("Bye World")); + } + }; + } +} \ No newline at end of file Propchange: camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerAsyncTimeoutTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerAsyncTimeoutTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Copied: camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerTimeoutTest.java (from r834779, camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerSlowResponseTest.java) URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerTimeoutTest.java?p2=camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerTimeoutTest.java&p1=camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerSlowResponseTest.java&r1=834779&r2=834846&rev=834846&view=diff ============================================================================== --- camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerSlowResponseTest.java (original) +++ camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerTimeoutTest.java Wed Nov 11 12:12:55 2009 @@ -16,10 +16,7 @@ */ package org.apache.camel.component.jetty.jettyproducer; -import javax.servlet.http.HttpServletResponse; - -import org.apache.camel.Exchange; -import org.apache.camel.Processor; +import org.apache.camel.ExchangeTimedOutException; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.test.junit4.CamelTestSupport; import org.junit.Test; @@ -27,19 +24,19 @@ /** * @version $Revision$ */ -public class JettyHttpProducerSlowResponseTest extends CamelTestSupport { +public class JettyHttpProducerTimeoutTest extends CamelTestSupport { - private String url = "jetty://http://0.0.0.0:9123/foo"; + private String url = "jetty://http://0.0.0.0:9123/timeout?httpClient.timeout=2000"; @Test - public void testSlowReply() throws Exception { - Exchange exchange = template.request(url, null); - assertNotNull(exchange); - - String reply = exchange.getOut().getBody(String.class); - assertEquals("Bye World", reply); - - assertEquals(4, exchange.getOut().getHeaders().size()); + public void testTimeout() throws Exception { + try { + template.request(url, null); + fail("Should have thrown a timeout exception"); + } catch (Exception e) { + ExchangeTimedOutException cause = assertIsInstanceOf(ExchangeTimedOutException.class, e.getCause()); + assertEquals(2000, cause.getTimeout()); + } } @Override @@ -47,22 +44,7 @@ return new RouteBuilder() { @Override public void configure() throws Exception { - from(url).process(new Processor() { - public void process(Exchange exchange) throws Exception { - HttpServletResponse res = exchange.getIn().getBody(HttpServletResponse.class); - res.setStatus(200); - res.setHeader("customer", "gold"); - - // write empty string to force flushing - res.getWriter().write(""); - res.flushBuffer(); - - Thread.sleep(1000); - - res.getWriter().write("Bye World"); - res.flushBuffer(); - } - }); + from(url).delay(5000).transform(constant("Bye World")); } }; }