Author: krasserm Date: Thu Jul 22 19:24:24 2010 New Revision: 966815 URL: http://svn.apache.org/viewvc?rev=966815&view=rev Log: Closes CAMEL-2986: IllegalStateException in CamelContinuationServlet under heavy load
Added: camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelMultipartFilter.java Modified: camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpComponent.java Modified: camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java?rev=966815&r1=966814&r2=966815&view=diff ============================================================================== --- camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java (original) +++ camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java Thu Jul 22 19:24:24 2010 @@ -38,6 +38,8 @@ import org.eclipse.jetty.continuation.Co */ public class CamelContinuationServlet extends CamelServlet { + static final String EXCHANGE_ATRRIBUTE_NAME = "CamelExchange"; + private static final long serialVersionUID = 1L; @Override @@ -50,15 +52,15 @@ public class CamelContinuationServlet ex return; } - // are we suspended? - if (consumer.isSuspended()) { + final Continuation continuation = ContinuationSupport.getContinuation(request); + + // are we suspended and a request is dispatched initially? + if (consumer.isSuspended() && continuation.isInitial()) { response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE); return; } - final Continuation continuation = ContinuationSupport.getContinuation(request); if (continuation.isInitial()) { - // a new request so create an exchange final Exchange exchange = new DefaultExchange(consumer.getEndpoint(), ExchangePattern.InOut); if (consumer.getEndpoint().isBridgeEndpoint()) { @@ -69,44 +71,28 @@ public class CamelContinuationServlet ex } exchange.setIn(new HttpMessage(exchange, request, response)); + if (log.isTraceEnabled()) { + log.trace("Suspending continuation of exchangeId: " + exchange.getExchangeId()); + } + continuation.suspend(); + // use the asynchronous API to process the exchange - boolean sync = consumer.getAsyncProcessor().process(exchange, new AsyncCallback() { + consumer.getAsyncProcessor().process(exchange, new AsyncCallback() { public void done(boolean doneSync) { - // we only have to handle async completion - if (doneSync) { - return; - } - - // we should resume the continuation now that we are done asynchronously if (log.isTraceEnabled()) { log.trace("Resuming continuation of exchangeId: " + exchange.getExchangeId()); } - continuation.setAttribute("CamelExchange", exchange); + // resume processing after both, sync and async callbacks + continuation.setAttribute(EXCHANGE_ATRRIBUTE_NAME, exchange); continuation.resume(); } }); - - if (!sync) { - // wait for the exchange to get processed. - // this might block until it completes or it might return via an exception and - // then this method is re-invoked once the the exchange has finished processing - if (log.isTraceEnabled()) { - log.trace("Suspending continuation of exchangeId: " + exchange.getExchangeId()); - } - continuation.suspend(response); - return; - } - - // now lets output to the response - if (log.isTraceEnabled()) { - log.trace("Writing response of exchangeId: " + exchange.getExchangeId()); - } - consumer.getBinding().writeResponse(exchange, response); return; } if (continuation.isResumed()) { - Exchange exchange = (Exchange) continuation.getAttribute("CamelExchange"); + // a re-dispatched request containing the processing result + Exchange exchange = (Exchange) continuation.getAttribute(EXCHANGE_ATRRIBUTE_NAME); if (log.isTraceEnabled()) { log.trace("Resuming continuation of exchangeId: " + exchange.getExchangeId()); } @@ -116,8 +102,10 @@ public class CamelContinuationServlet ex log.trace("Writing response of exchangeId: " + exchange.getExchangeId()); } consumer.getBinding().writeResponse(exchange, response); - return; } + } catch (IOException e) { + log.error("Error processing request", e); + throw e; } catch (Exception e) { log.error("Error processing request", e); throw new ServletException(e); Added: camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelMultipartFilter.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelMultipartFilter.java?rev=966815&view=auto ============================================================================== --- camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelMultipartFilter.java (added) +++ camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelMultipartFilter.java Thu Jul 22 19:24:24 2010 @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.jetty; + +import java.io.IOException; + +import javax.servlet.FilterChain; +import javax.servlet.ServletException; +import javax.servlet.ServletRequest; +import javax.servlet.ServletResponse; + +import org.eclipse.jetty.servlets.MultiPartFilter; + +/** + * A multipart filter that processes only initially dispatched requests. + * Re-dispatched requests are ignored. + */ +class CamelMultipartFilter extends MultiPartFilter { + + @Override + public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException { + if (request.getAttribute(CamelContinuationServlet.EXCHANGE_ATRRIBUTE_NAME) == null) { + super.doFilter(request, response, chain); + } else { + chain.doFilter(request, response); + } + } + +} Modified: camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpComponent.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpComponent.java?rev=966815&r1=966814&r2=966815&view=diff ============================================================================== --- camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpComponent.java (original) +++ camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpComponent.java Thu Jul 22 19:24:24 2010 @@ -55,7 +55,6 @@ import org.eclipse.jetty.server.ssl.SslS import org.eclipse.jetty.servlet.FilterHolder; import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletHolder; -import org.eclipse.jetty.servlets.MultiPartFilter; import org.eclipse.jetty.util.component.LifeCycle; import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.eclipse.jetty.util.thread.ThreadPool; @@ -599,7 +598,7 @@ public class JettyHttpComponent extends } context.setAttribute("javax.servlet.context.tempdir", file); } - filterHolder.setFilter(new MultiPartFilter()); + filterHolder.setFilter(new CamelMultipartFilter()); //add the default MultiPartFilter filter for it context.addFilter(filterHolder, "/*", 0); context.addServlet(holder, "/*");