This is an automated email from the ASF dual-hosted git repository. lburgazzoli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git
The following commit(s) were added to refs/heads/master by this push: new 00cd740 platform-http: handle requests using a thread from the worker pool 00cd740 is described below commit 00cd740abcc7e39f16ea21fca682651eebe74518 Author: Luca Burgazzoli <lburgazz...@gmail.com> AuthorDate: Wed Sep 30 18:22:01 2020 +0200 platform-http: handle requests using a thread from the worker pool --- .../http/runtime/QuarkusPlatformHttpConsumer.java | 261 ++++++++++++--------- 1 file changed, 153 insertions(+), 108 deletions(-) diff --git a/extensions/platform-http/runtime/src/main/java/org/apache/camel/quarkus/component/platform/http/runtime/QuarkusPlatformHttpConsumer.java b/extensions/platform-http/runtime/src/main/java/org/apache/camel/quarkus/component/platform/http/runtime/QuarkusPlatformHttpConsumer.java index 087ed58..cf9fc15 100644 --- a/extensions/platform-http/runtime/src/main/java/org/apache/camel/quarkus/component/platform/http/runtime/QuarkusPlatformHttpConsumer.java +++ b/extensions/platform-http/runtime/src/main/java/org/apache/camel/quarkus/component/platform/http/runtime/QuarkusPlatformHttpConsumer.java @@ -35,6 +35,7 @@ import java.util.regex.Pattern; import io.vertx.core.Handler; import io.vertx.core.MultiMap; +import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; import io.vertx.core.http.HttpMethod; import io.vertx.core.http.HttpServerRequest; @@ -71,10 +72,10 @@ public class QuarkusPlatformHttpConsumer extends DefaultConsumer { private final Router router; private final List<Handler<RoutingContext>> handlers; - private Route route; private final String fileNameExtWhitelist; private final UploadAttacher uploadAttacher; private final Pattern PATH_PARAMETER_PATTERN = Pattern.compile("\\{([^/}]+)\\}"); + private Route route; public QuarkusPlatformHttpConsumer(PlatformHttpEndpoint endpoint, Processor processor, Router router, List<Handler<RoutingContext>> handlers, UploadAttacher uploadAttacher) { @@ -86,81 +87,6 @@ public class QuarkusPlatformHttpConsumer extends DefaultConsumer { this.uploadAttacher = uploadAttacher; } - @Override - public PlatformHttpEndpoint getEndpoint() { - return (PlatformHttpEndpoint) super.getEndpoint(); - } - - @Override - protected void doStart() throws Exception { - super.doStart(); - - final PlatformHttpEndpoint endpoint = getEndpoint(); - final String path = endpoint.getPath(); - /* Transform from the Camel path param syntax /path/{key} to vert.x web's /path/:key */ - final String vertxPathParamPath = PATH_PARAMETER_PATTERN.matcher(path).replaceAll(":$1"); - final Route newRoute = router.route(vertxPathParamPath); - - final Set<Method> methods = Method.parseList(endpoint.getHttpMethodRestrict()); - if (!methods.equals(Method.getAll())) { - methods.stream().forEach(m -> newRoute.method(HttpMethod.valueOf(m.name()))); - } - if (endpoint.getConsumes() != null) { - newRoute.consumes(endpoint.getConsumes()); - } - if (endpoint.getProduces() != null) { - newRoute.produces(endpoint.getProduces()); - } - - handlers.forEach(newRoute::handler); - - newRoute.handler( - ctx -> { - Exchange exchg = null; - try { - final Exchange exchange = exchg = toExchange(ctx); - createUoW(exchange); - getAsyncProcessor().process( - exchange, - doneSync -> writeResponse(ctx, exchange, getEndpoint().getHeaderFilterStrategy())); - } catch (Exception e) { - ctx.fail(e); - getExceptionHandler().handleException("Failed handling platform-http endpoint " + path, exchg, e); - } finally { - if (exchg != null) { - doneUoW(exchg); - } - } - }); - - this.route = newRoute; - } - - @Override - protected void doStop() throws Exception { - if (route != null) { - route.remove(); - route = null; - } - super.doStop(); - } - - @Override - protected void doSuspend() throws Exception { - if (route != null) { - route.disable(); - } - super.doSuspend(); - } - - @Override - protected void doResume() throws Exception { - if (route != null) { - route.enable(); - } - super.doResume(); - } - static Object toHttpResponse(HttpServerResponse response, Message message, HeaderFilterStrategy headerFilterStrategy) { final Exchange exchange = message.getExchange(); @@ -307,20 +233,6 @@ public class QuarkusPlatformHttpConsumer extends DefaultConsumer { } - Exchange toExchange(RoutingContext ctx) { - final Exchange exchange = getEndpoint().createExchange(); - Message in = toCamelMessage(ctx, exchange); - - final String charset = ctx.parsedHeaders().contentType().parameter("charset"); - if (charset != null) { - exchange.setProperty(Exchange.CHARSET_NAME, charset); - in.setHeader(Exchange.HTTP_CHARACTER_ENCODING, charset); - } - - exchange.setIn(in); - return exchange; - } - static void populateCamelHeaders( RoutingContext ctx, Map<String, Object> headersMap, @@ -377,6 +289,157 @@ public class QuarkusPlatformHttpConsumer extends DefaultConsumer { headersMap.put(Exchange.HTTP_RAW_QUERY, request.query()); } + @SuppressWarnings("unchecked") + static void appendHeader(Map<String, Object> headers, String key, Object value) { + if (headers.containsKey(key)) { + Object existing = headers.get(key); + List<Object> list; + if (existing instanceof List) { + list = (List<Object>) existing; + } else { + list = new ArrayList<>(); + list.add(existing); + } + list.add(value); + value = list; + } + + headers.put(key, value); + } + + @Override + public PlatformHttpEndpoint getEndpoint() { + return (PlatformHttpEndpoint) super.getEndpoint(); + } + + @Override + protected void doStart() throws Exception { + super.doStart(); + + final PlatformHttpEndpoint endpoint = getEndpoint(); + final String path = endpoint.getPath(); + /* Transform from the Camel path param syntax /path/{key} to vert.x web's /path/:key */ + final String vertxPathParamPath = PATH_PARAMETER_PATTERN.matcher(path).replaceAll(":$1"); + final Route newRoute = router.route(vertxPathParamPath); + + final Set<Method> methods = Method.parseList(endpoint.getHttpMethodRestrict()); + if (!methods.equals(Method.getAll())) { + methods.stream().forEach(m -> newRoute.method(HttpMethod.valueOf(m.name()))); + } + if (endpoint.getConsumes() != null) { + newRoute.consumes(endpoint.getConsumes()); + } + if (endpoint.getProduces() != null) { + newRoute.produces(endpoint.getProduces()); + } + + handlers.forEach(newRoute::handler); + + newRoute.handler(this::handleRequest); + + this.route = newRoute; + } + + @Override + protected void doStop() throws Exception { + if (route != null) { + route.remove(); + route = null; + } + super.doStop(); + } + + @Override + protected void doSuspend() throws Exception { + if (route != null) { + route.disable(); + } + super.doSuspend(); + } + + @Override + protected void doResume() throws Exception { + if (route != null) { + route.enable(); + } + super.doResume(); + } + + private void handleRequest(RoutingContext ctx) { + final Vertx vertx = ctx.vertx(); + final Exchange exchange = toExchange(ctx); + + // + // We do not know if any of the processing logic of the route is synchronous or not so we + // need to process the request on a thread on the Vert.x worker pool. + // + // As example, assuming the platform-http component is configured as the transport provider + // for the rest dsl, then the following code may result in a blocking operation that could + // block Vert.x event-loop for too long if the target service takes long to respond, as + // example in case the service is a knative service scaled to zero that could take some time + // to be come available: + // + // rest("/results") + // .get("/{id}") + // .route() + // .removeHeaders("*", "CamelHttpPath") + // .to("rest:get:?bridgeEndpoint=true"); + // + vertx.executeBlocking( + promise -> { + try { + createUoW(exchange); + } catch (Exception e) { + promise.fail(e); + return; + } + + getAsyncProcessor().process(exchange, c -> { + if (!exchange.isFailed()) { + promise.complete(); + } else { + promise.fail(exchange.getException()); + } + }); + }, + false, + result -> { + try { + if (result.succeeded()) { + try { + writeResponse(ctx, exchange, getEndpoint().getHeaderFilterStrategy()); + } catch (Exception e) { + getExceptionHandler().handleException( + "Failed handling platform-http endpoint " + getEndpoint().getPath(), + e); + } + } else { + getExceptionHandler().handleException( + "Failed handling platform-http endpoint " + getEndpoint().getPath(), + result.cause()); + + ctx.fail(result.cause()); + } + } finally { + doneUoW(exchange); + } + }); + } + + Exchange toExchange(RoutingContext ctx) { + final Exchange exchange = getEndpoint().createExchange(); + Message in = toCamelMessage(ctx, exchange); + + final String charset = ctx.parsedHeaders().contentType().parameter("charset"); + if (charset != null) { + exchange.setProperty(Exchange.CHARSET_NAME, charset); + in.setHeader(Exchange.HTTP_CHARACTER_ENCODING, charset); + } + + exchange.setIn(in); + return exchange; + } + Message toCamelMessage(RoutingContext ctx, Exchange exchange) { final Message result = new DefaultMessage(exchange); @@ -414,24 +477,6 @@ public class QuarkusPlatformHttpConsumer extends DefaultConsumer { return result; } - @SuppressWarnings("unchecked") - static void appendHeader(Map<String, Object> headers, String key, Object value) { - if (headers.containsKey(key)) { - Object existing = headers.get(key); - List<Object> list; - if (existing instanceof List) { - list = (List<Object>) existing; - } else { - list = new ArrayList<>(); - list.add(existing); - } - list.add(value); - value = list; - } - - headers.put(key, value); - } - void populateAttachments(Set<FileUpload> uploads, Message message) { for (FileUpload upload : uploads) { final String name = upload.name();