This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new c0a070e CAMEL-16366: camel-jetty - Jetty consumer supports exchange pooling c0a070e is described below commit c0a070e8bd0810d7b2713991ddb7263b38dbcad0 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Mon Apr 12 07:59:28 2021 +0200 CAMEL-16366: camel-jetty - Jetty consumer supports exchange pooling --- .../org/apache/camel/http/common/HttpMessage.java | 38 +++++++++++++++------- .../component/jetty/CamelContinuationServlet.java | 33 +++++++++++++++---- 2 files changed, 53 insertions(+), 18 deletions(-) diff --git a/components/camel-http-common/src/main/java/org/apache/camel/http/common/HttpMessage.java b/components/camel-http-common/src/main/java/org/apache/camel/http/common/HttpMessage.java index bd9d848..f0a21ba 100644 --- a/components/camel-http-common/src/main/java/org/apache/camel/http/common/HttpMessage.java +++ b/components/camel-http-common/src/main/java/org/apache/camel/http/common/HttpMessage.java @@ -28,14 +28,31 @@ import org.apache.camel.util.ObjectHelper; public class HttpMessage extends DefaultMessage { - private final HttpServletRequest request; - private final HttpServletResponse response; - private final HttpCommonEndpoint endpoint; + private HttpServletRequest request; + private HttpServletResponse response; + private HttpCommonEndpoint endpoint; private boolean requestRead; public HttpMessage(Exchange exchange, HttpCommonEndpoint endpoint, HttpServletRequest request, HttpServletResponse response) { super(exchange); + init(exchange, endpoint, request, response); + } + + private HttpMessage(HttpServletRequest request, HttpServletResponse response, Exchange exchange, + HttpCommonEndpoint endpoint, + boolean requestRead) { + super(exchange); + this.request = request; + this.response = response; + this.endpoint = endpoint; + this.requestRead = requestRead; + } + + public void init( + Exchange exchange, HttpCommonEndpoint endpoint, HttpServletRequest request, + HttpServletResponse response) { + setExchange(exchange); this.requestRead = false; this.endpoint = endpoint; @@ -56,14 +73,13 @@ public class HttpMessage extends DefaultMessage { endpoint.getHttpBinding().readRequest(request, this); } - private HttpMessage(HttpServletRequest request, HttpServletResponse response, Exchange exchange, - HttpCommonEndpoint endpoint, - boolean requestRead) { - super(exchange); - this.request = request; - this.response = response; - this.endpoint = endpoint; - this.requestRead = requestRead; + @Override + public void reset() { + super.reset(); + request = null; + response = null; + endpoint = null; + requestRead = false; } public HttpServletRequest getRequest() { diff --git a/components/camel-jetty-common/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java b/components/camel-jetty-common/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java index 65db856..a1b9a4a 100644 --- a/components/camel-jetty-common/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java +++ b/components/camel-jetty-common/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java @@ -30,12 +30,15 @@ import javax.servlet.http.HttpServletResponse; import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; +import org.apache.camel.ExtendedExchange; +import org.apache.camel.Message; import org.apache.camel.http.common.CamelServlet; import org.apache.camel.http.common.HttpCommonEndpoint; import org.apache.camel.http.common.HttpConstants; import org.apache.camel.http.common.HttpConsumer; import org.apache.camel.http.common.HttpHelper; import org.apache.camel.http.common.HttpMessage; +import org.apache.camel.spi.UnitOfWork; import org.apache.camel.support.ObjectHelper; import org.apache.camel.util.UnsafeUriCharactersEncoder; import org.eclipse.jetty.continuation.Continuation; @@ -182,7 +185,7 @@ public class CamelContinuationServlet extends CamelServlet { // a new request so create an exchange // must be prototype scoped (not pooled) so we create the exchange via endpoint - final Exchange exchange = endpoint.createExchange(); + final Exchange exchange = consumer.createExchange(false); exchange.setPattern(ExchangePattern.InOut); if (consumer.getEndpoint().isBridgeEndpoint()) { @@ -195,7 +198,14 @@ public class CamelContinuationServlet extends CamelServlet { HttpHelper.setCharsetFromContentType(request.getContentType(), exchange); - exchange.setIn(new HttpMessage(exchange, consumer.getEndpoint(), request, response)); + // reuse existing http message if pooled + Message msg = exchange.getIn(); + if (msg instanceof HttpMessage) { + HttpMessage hm = (HttpMessage) msg; + hm.init(exchange, endpoint, request, response); + } else { + exchange.setIn(new HttpMessage(exchange, endpoint, request, response)); + } // set context path as header String contextPath = consumer.getEndpoint().getPath(); exchange.getIn().setHeader("CamelServletContextPath", contextPath); @@ -208,11 +218,18 @@ public class CamelContinuationServlet extends CamelServlet { continuation.setAttribute(EXCHANGE_ATTRIBUTE_ID, exchange.getExchangeId()); // we want to handle the UoW - try { - consumer.createUoW(exchange); - } catch (Exception e) { - log.error("Error processing request", e); - throw new ServletException(e); + UnitOfWork uow = exchange.getUnitOfWork(); + if (uow == null) { + try { + consumer.createUoW(exchange); + } catch (Exception e) { + log.error("Error processing request", e); + throw new ServletException(e); + } + } else if (uow.onPrepare(exchange)) { + // need to re-attach uow + ExtendedExchange ee = (ExtendedExchange) exchange; + ee.setUnitOfWork(uow); } // must suspend before we process the exchange @@ -238,6 +255,7 @@ public class CamelContinuationServlet extends CamelServlet { continuation.resume(); } else { log.warn("Cannot resume expired continuation of exchangeId: {}", exchange.getExchangeId()); + consumer.releaseExchange(exchange, false); } } }); @@ -270,6 +288,7 @@ public class CamelContinuationServlet extends CamelServlet { throw new ServletException(e); } finally { consumer.doneUoW(result); + consumer.releaseExchange(result, false); } }