This is an automated email from the ASF dual-hosted git repository. lburgazzoli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new 57fb955 camel-platform-http-vertx: handle requests using a thread from the worker pool 57fb955 is described below commit 57fb9554c7eeb54d6d707f038fc1febf8cab9cb8 Author: Luca Burgazzoli <lburgazz...@gmail.com> AuthorDate: Wed Sep 30 15:26:03 2020 +0200 camel-platform-http-vertx: handle requests using a thread from the worker pool --- .../http/vertx/VertxPlatformHttpConsumer.java | 82 ++++++++++++---- .../http/vertx/VertxPlatformHttpSupport.java | 23 ++--- .../http/vertx/VertxPlatformHttpEngineTest.java | 105 ++++++++++++++++++++- 3 files changed, 171 insertions(+), 39 deletions(-) diff --git a/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpConsumer.java b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpConsumer.java index 8c0942d7..125a4ed 100644 --- a/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpConsumer.java +++ b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpConsumer.java @@ -26,6 +26,7 @@ import java.util.regex.Pattern; import io.vertx.core.Handler; import io.vertx.core.MultiMap; +import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; import io.vertx.core.http.HttpMethod; import io.vertx.ext.web.FileUpload; @@ -106,25 +107,7 @@ public class VertxPlatformHttpConsumer extends DefaultConsumer { newRoute.handler(handler); } - newRoute.handler( - ctx -> { - Exchange exchg = null; - try { - final Exchange exchange = exchg = toExchange(ctx); - createUoW(exchange); - getAsyncProcessor().process( - exchange, - doneSync -> writeResponse(ctx, exchange, getEndpoint().getHeaderFilterStrategy())); - } catch (Exception e) { - ctx.fail(e); - getExceptionHandler().handleException("Failed handling platform-http endpoint " + endpoint.getPath(), - exchg, e); - } finally { - if (exchg != null) { - doneUoW(exchg); - } - } - }); + newRoute.handler(this::handleRequest); this.route = newRoute; } @@ -163,6 +146,67 @@ public class VertxPlatformHttpConsumer extends DefaultConsumer { return PATH_PARAMETER_PATTERN.matcher(path).replaceAll(":$1"); } + private void handleRequest(RoutingContext ctx) { + final Vertx vertx = ctx.vertx(); + final Exchange exchange = toExchange(ctx); + + // + // We do not know if any of the processing logic of the route is synchronous or not so we + // need to process the request on a thread on the Vert.x worker pool. + // + // As example, assuming the platform-http component is configured as the transport provider + // for the rest dsl, then the following code may result in a blocking operation that could + // block Vert.x event-loop for too long if the target service takes long to respond, as + // example in case the service is a knative service scaled to zero that could take some time + // to be come available: + // + // rest("/results") + // .get("/{id}") + // .route() + // .removeHeaders("*", "CamelHttpPath") + // .to("rest:get:?bridgeEndpoint=true"); + // + vertx.executeBlocking( + promise -> { + try { + createUoW(exchange); + } catch (Exception e) { + promise.fail(e); + return; + } + + getAsyncProcessor().process(exchange, c -> { + if (!exchange.isFailed()) { + promise.complete(); + } else { + promise.fail(exchange.getException()); + } + }); + }, + false, + result -> { + try { + if (result.succeeded()) { + try { + writeResponse(ctx, exchange, getEndpoint().getHeaderFilterStrategy()); + } catch (Exception e) { + getExceptionHandler().handleException( + "Failed handling platform-http endpoint " + getEndpoint().getPath(), + e); + } + } else { + getExceptionHandler().handleException( + "Failed handling platform-http endpoint " + getEndpoint().getPath(), + result.cause()); + + ctx.fail(result.cause()); + } + } finally { + doneUoW(exchange); + } + }); + } + private Exchange toExchange(RoutingContext ctx) { final Exchange exchange = getEndpoint().createExchange(); final Message in = toCamelMessage(ctx, exchange); diff --git a/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpSupport.java b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpSupport.java index d5c4cbc..728b5e4 100644 --- a/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpSupport.java +++ b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpSupport.java @@ -16,7 +16,6 @@ */ package org.apache.camel.component.platform.http.vertx; -import java.io.IOException; import java.io.InputStream; import java.io.PrintWriter; import java.io.StringWriter; @@ -34,8 +33,6 @@ import io.vertx.core.http.HttpServerResponse; import io.vertx.ext.web.RoutingContext; import org.apache.camel.Exchange; import org.apache.camel.Message; -import org.apache.camel.NoTypeConversionAvailableException; -import org.apache.camel.TypeConversionException; import org.apache.camel.TypeConverter; import org.apache.camel.spi.HeaderFilterStrategy; import org.apache.camel.support.ExchangeHelper; @@ -158,10 +155,11 @@ public final class VertxPlatformHttpSupport { return codeToUse; } - static void writeResponse(RoutingContext ctx, Exchange camelExchange, HeaderFilterStrategy headerFilterStrategy) { + static void writeResponse(RoutingContext ctx, Exchange camelExchange, HeaderFilterStrategy headerFilterStrategy) + throws Exception { final Object body = toHttpResponse(ctx.response(), camelExchange.getMessage(), headerFilterStrategy); - final HttpServerResponse response = ctx.response(); + if (body == null) { LOGGER.trace("No payload to send as reply for exchange: {}", camelExchange); response.end(); @@ -176,20 +174,15 @@ public final class VertxPlatformHttpSupport { b.appendBytes(bytes, 0, len); response.write(b); } - } catch (IOException e) { - throw new RuntimeException(e); } response.end(); } else { final TypeConverter tc = camelExchange.getContext().getTypeConverter(); - try { - final ByteBuffer bb = tc.mandatoryConvertTo(ByteBuffer.class, body); - final Buffer b = Buffer.buffer(bb.capacity()); - b.setBytes(0, bb); - response.end(b); - } catch (TypeConversionException | NoTypeConversionAvailableException e) { - throw new RuntimeException(e); - } + final ByteBuffer bb = tc.mandatoryConvertTo(ByteBuffer.class, body); + final Buffer b = Buffer.buffer(bb.capacity()); + + b.setBytes(0, bb); + response.end(b); } } diff --git a/components/camel-platform-http-vertx/src/test/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpEngineTest.java b/components/camel-platform-http-vertx/src/test/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpEngineTest.java index 0648f1d..06b07e7 100644 --- a/components/camel-platform-http-vertx/src/test/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpEngineTest.java +++ b/components/camel-platform-http-vertx/src/test/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpEngineTest.java @@ -17,7 +17,9 @@ package org.apache.camel.component.platform.http.vertx; import java.util.Arrays; +import java.util.concurrent.TimeUnit; +import io.vertx.core.VertxOptions; import org.apache.camel.CamelContext; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.platform.http.PlatformHttpComponent; @@ -76,6 +78,28 @@ public class VertxPlatformHttpEngineTest { } @Test + public void testEngineSetup() throws Exception { + final int port = AvailablePortFinder.getNextAvailable(); + final CamelContext context = new DefaultCamelContext(); + + try { + VertxPlatformHttpServerConfiguration conf = new VertxPlatformHttpServerConfiguration(); + conf.setBindPort(port); + + context.addService(new VertxPlatformHttpServer(conf)); + context.start(); + + assertThat(VertxPlatformHttpRouter.lookup(context)).isNotNull(); + assertThat(context.getComponent("platform-http")).isInstanceOfSatisfying(PlatformHttpComponent.class, component -> { + assertThat(component.getEngine()).isInstanceOf(VertxPlatformHttpEngine.class); + }); + + } finally { + context.stop(); + } + } + + @Test public void testEngine() throws Exception { final int port = AvailablePortFinder.getNextAvailable(); final CamelContext context = new DefaultCamelContext(); @@ -99,11 +123,6 @@ public class VertxPlatformHttpEngineTest { context.start(); - assertThat(VertxPlatformHttpRouter.lookup(context)).isNotNull(); - assertThat(context.getComponent("platform-http")).isInstanceOfSatisfying(PlatformHttpComponent.class, component -> { - assertThat(component.getEngine()).isInstanceOf(VertxPlatformHttpEngine.class); - }); - given() .port(conf.getBindPort()) .when() @@ -127,6 +146,82 @@ public class VertxPlatformHttpEngineTest { } @Test + public void testSlowConsumer() throws Exception { + final int port = AvailablePortFinder.getNextAvailable(); + final CamelContext context = new DefaultCamelContext(); + + try { + VertxPlatformHttpServerConfiguration conf = new VertxPlatformHttpServerConfiguration(); + conf.setBindPort(port); + + context.getRegistry().bind( + "vertx-options", + new VertxOptions() + .setMaxEventLoopExecuteTime(2) + .setMaxEventLoopExecuteTimeUnit(TimeUnit.SECONDS)); + + context.addService(new VertxPlatformHttpServer(conf)); + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("platform-http:/get") + .routeId("get") + .process(e -> Thread.sleep(TimeUnit.SECONDS.toMillis(3))) + .setBody().constant("get"); + } + }); + + context.start(); + + given() + .port(conf.getBindPort()) + .when() + .get("/get") + .then() + .statusCode(200) + .body(equalTo("get")); + + } finally { + context.stop(); + } + } + + @Test + public void testFailingConsumer() throws Exception { + final int port = AvailablePortFinder.getNextAvailable(); + final CamelContext context = new DefaultCamelContext(); + + try { + VertxPlatformHttpServerConfiguration conf = new VertxPlatformHttpServerConfiguration(); + conf.setBindPort(port); + + context.addService(new VertxPlatformHttpServer(conf)); + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("platform-http:/get") + .routeId("get") + .process(exchange -> { + throw new RuntimeException(); + }); + } + }); + + context.start(); + + given() + .port(conf.getBindPort()) + .when() + .get("/get") + .then() + .statusCode(500); + + } finally { + context.stop(); + } + } + + @Test public void testEngineSSL() throws Exception { VertxPlatformHttpServerConfiguration conf = new VertxPlatformHttpServerConfiguration(); conf.setSslContextParameters(serverSSLParameters);