Author: krasserm
Date: Thu Jul 22 19:24:24 2010
New Revision: 966815

URL: http://svn.apache.org/viewvc?rev=966815&view=rev
Log:
Closes CAMEL-2986: IllegalStateException in CamelContinuationServlet under 
heavy load

Added:
    
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelMultipartFilter.java
Modified:
    
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java
    
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpComponent.java

Modified: 
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java?rev=966815&r1=966814&r2=966815&view=diff
==============================================================================
--- 
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java
 (original)
+++ 
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java
 Thu Jul 22 19:24:24 2010
@@ -38,6 +38,8 @@ import org.eclipse.jetty.continuation.Co
  */
 public class CamelContinuationServlet extends CamelServlet {
 
+    static final String EXCHANGE_ATRRIBUTE_NAME = "CamelExchange";
+
     private static final long serialVersionUID = 1L;
 
     @Override
@@ -50,15 +52,15 @@ public class CamelContinuationServlet ex
                 return;
             }
 
-            // are we suspended?
-            if (consumer.isSuspended()) {
+            final Continuation continuation = 
ContinuationSupport.getContinuation(request);
+
+            // are we suspended and a request is dispatched initially?
+            if (consumer.isSuspended() && continuation.isInitial()) {
                 response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
                 return;
             }
 
-            final Continuation continuation = 
ContinuationSupport.getContinuation(request);
             if (continuation.isInitial()) {
-
                 // a new request so create an exchange
                 final Exchange exchange = new 
DefaultExchange(consumer.getEndpoint(), ExchangePattern.InOut);
                 if (consumer.getEndpoint().isBridgeEndpoint()) {
@@ -69,44 +71,28 @@ public class CamelContinuationServlet ex
                 }
                 exchange.setIn(new HttpMessage(exchange, request, response));
 
+                if (log.isTraceEnabled()) {
+                    log.trace("Suspending continuation of exchangeId: " + 
exchange.getExchangeId());
+                }
+                continuation.suspend();
+
                 // use the asynchronous API to process the exchange
-                boolean sync = consumer.getAsyncProcessor().process(exchange, 
new AsyncCallback() {
+                consumer.getAsyncProcessor().process(exchange, new 
AsyncCallback() {
                     public void done(boolean doneSync) {
-                        // we only have to handle async completion
-                        if (doneSync) {
-                            return;
-                        }
-
-                        // we should resume the continuation now that we are 
done asynchronously
                         if (log.isTraceEnabled()) {
                             log.trace("Resuming continuation of exchangeId: " 
+ exchange.getExchangeId());
                         }
-                        continuation.setAttribute("CamelExchange", exchange);
+                        // resume processing after both, sync and async 
callbacks
+                        continuation.setAttribute(EXCHANGE_ATRRIBUTE_NAME, 
exchange);
                         continuation.resume();
                     }
                 });
-
-                if (!sync) {
-                    // wait for the exchange to get processed.
-                    // this might block until it completes or it might return 
via an exception and
-                    // then this method is re-invoked once the the exchange 
has finished processing
-                    if (log.isTraceEnabled()) {
-                        log.trace("Suspending continuation of exchangeId: " + 
exchange.getExchangeId());
-                    }
-                    continuation.suspend(response);
-                    return;
-                }
-
-                // now lets output to the response
-                if (log.isTraceEnabled()) {
-                    log.trace("Writing response of exchangeId: " + 
exchange.getExchangeId());
-                }
-                consumer.getBinding().writeResponse(exchange, response);
                 return;
             }
 
             if (continuation.isResumed()) {
-                Exchange exchange = (Exchange) 
continuation.getAttribute("CamelExchange");
+                // a re-dispatched request containing the processing result
+                Exchange exchange = (Exchange) 
continuation.getAttribute(EXCHANGE_ATRRIBUTE_NAME);
                 if (log.isTraceEnabled()) {
                     log.trace("Resuming continuation of exchangeId: " + 
exchange.getExchangeId());
                 }
@@ -116,8 +102,10 @@ public class CamelContinuationServlet ex
                     log.trace("Writing response of exchangeId: " + 
exchange.getExchangeId());
                 }
                 consumer.getBinding().writeResponse(exchange, response);
-                return;
             }
+        } catch (IOException e) {
+            log.error("Error processing request", e);
+            throw e;
         } catch (Exception e) {
             log.error("Error processing request", e);
             throw new ServletException(e);

Added: 
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelMultipartFilter.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelMultipartFilter.java?rev=966815&view=auto
==============================================================================
--- 
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelMultipartFilter.java
 (added)
+++ 
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelMultipartFilter.java
 Thu Jul 22 19:24:24 2010
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jetty;
+
+import java.io.IOException;
+
+import javax.servlet.FilterChain;
+import javax.servlet.ServletException;
+import javax.servlet.ServletRequest;
+import javax.servlet.ServletResponse;
+
+import org.eclipse.jetty.servlets.MultiPartFilter;
+
+/**
+ * A multipart filter that processes only initially dispatched requests.
+ * Re-dispatched requests are ignored.
+ */
+class CamelMultipartFilter extends MultiPartFilter {
+
+    @Override
+    public void doFilter(ServletRequest request, ServletResponse response, 
FilterChain chain) throws IOException, ServletException {
+        if 
(request.getAttribute(CamelContinuationServlet.EXCHANGE_ATRRIBUTE_NAME) == 
null) {
+            super.doFilter(request, response, chain);
+        } else {
+            chain.doFilter(request, response);
+        }
+    }
+
+}

Modified: 
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpComponent.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpComponent.java?rev=966815&r1=966814&r2=966815&view=diff
==============================================================================
--- 
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpComponent.java
 (original)
+++ 
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpComponent.java
 Thu Jul 22 19:24:24 2010
@@ -55,7 +55,6 @@ import org.eclipse.jetty.server.ssl.SslS
 import org.eclipse.jetty.servlet.FilterHolder;
 import org.eclipse.jetty.servlet.ServletContextHandler;
 import org.eclipse.jetty.servlet.ServletHolder;
-import org.eclipse.jetty.servlets.MultiPartFilter;
 import org.eclipse.jetty.util.component.LifeCycle;
 import org.eclipse.jetty.util.thread.QueuedThreadPool;
 import org.eclipse.jetty.util.thread.ThreadPool;
@@ -599,7 +598,7 @@ public class JettyHttpComponent extends 
             }
             context.setAttribute("javax.servlet.context.tempdir", file);
         }
-        filterHolder.setFilter(new MultiPartFilter());
+        filterHolder.setFilter(new CamelMultipartFilter());
         //add the default MultiPartFilter filter for it
         context.addFilter(filterHolder, "/*", 0);
         context.addServlet(holder, "/*");


Reply via email to