This is an automated email from the ASF dual-hosted git repository. jamesnetherton pushed a commit to branch camel-4.8.x in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-4.8.x by this push: new fdf54dc4087 CAMEL-21528: Avoid blocking the event loop in vertx-http response handlers fdf54dc4087 is described below commit fdf54dc4087b5897f4372b8ea2b891e7728cd4d1 Author: James Netherton <jamesnether...@gmail.com> AuthorDate: Tue Dec 10 10:43:30 2024 +0000 CAMEL-21528: Avoid blocking the event loop in vertx-http response handlers --- .../component/vertx/http/VertxHttpProducer.java | 9 ++- .../vertx/http/VertxHttpSlowErrorHandlerTest.java | 80 ++++++++++++++++++++++ .../component/vertx/http/VertxHttpTestSupport.java | 38 ++++++++++ 3 files changed, 125 insertions(+), 2 deletions(-) diff --git a/components/camel-vertx/camel-vertx-http/src/main/java/org/apache/camel/component/vertx/http/VertxHttpProducer.java b/components/camel-vertx/camel-vertx-http/src/main/java/org/apache/camel/component/vertx/http/VertxHttpProducer.java index 3d227e7b777..8330ad6c048 100644 --- a/components/camel-vertx/camel-vertx-http/src/main/java/org/apache/camel/component/vertx/http/VertxHttpProducer.java +++ b/components/camel-vertx/camel-vertx-http/src/main/java/org/apache/camel/component/vertx/http/VertxHttpProducer.java @@ -19,6 +19,7 @@ package org.apache.camel.component.vertx.http; import java.io.ByteArrayOutputStream; import java.io.Serializable; import java.util.Map; +import java.util.concurrent.Callable; import io.vertx.core.AsyncResult; import io.vertx.core.Handler; @@ -123,7 +124,10 @@ public class VertxHttpProducer extends DefaultAsyncProducer { } private Handler<AsyncResult<HttpResponse<Buffer>>> createResultHandler(Exchange exchange, AsyncCallback callback) { - return response -> { + // Process the response on a thread from the Vert.x worker pool since there may be blocking code: + // - If a custom VertxHttpBinding is in use + // - If the Camel error handler routes to logic that contains blocking code + return response -> getEndpoint().getVertx().executeBlocking((Callable<Void>) () -> { try { vertxHttpBinding.handleResponse(getEndpoint(), exchange, response); } catch (Exception e) { @@ -131,6 +135,7 @@ public class VertxHttpProducer extends DefaultAsyncProducer { } finally { callback.done(false); } - }; + return null; + }); } } diff --git a/components/camel-vertx/camel-vertx-http/src/test/java/org/apache/camel/component/vertx/http/VertxHttpSlowErrorHandlerTest.java b/components/camel-vertx/camel-vertx-http/src/test/java/org/apache/camel/component/vertx/http/VertxHttpSlowErrorHandlerTest.java new file mode 100644 index 00000000000..8b72af67c92 --- /dev/null +++ b/components/camel-vertx/camel-vertx-http/src/test/java/org/apache/camel/component/vertx/http/VertxHttpSlowErrorHandlerTest.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.vertx.http; + +import io.vertx.core.Vertx; +import org.apache.camel.BindToRegistry; +import org.apache.camel.Exchange; +import org.apache.camel.RoutesBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; + +public class VertxHttpSlowErrorHandlerTest extends VertxHttpTestSupport { + private static final BlockedThreadReporter reporter = new BlockedThreadReporter(); + private static final String SLOW_SERVICE_RESPONSE = "Slow Response"; + + @AfterEach + public void afterEach() { + reporter.reset(); + } + + @Test + void slowErrorHandlerDoesNotBlockEventLoop() throws Exception { + Exchange result = template.request(getProducerUri() + "/test", null); + assertFalse(result.isFailed()); + assertFalse(reporter.isEventLoopBlocked()); + assertEquals(SLOW_SERVICE_RESPONSE, result.getMessage().getBody(String.class)); + } + + @BindToRegistry + public Vertx createVertx() { + return createVertxWithThreadBlockedHandler(reporter); + } + + @Override + protected RoutesBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + onException(Exception.class) + .handled(true) + .maximumRedeliveries(1) + .redeliveryDelay(0) + .to("direct:slow"); + + from(getTestServerUri() + "/test") + .to("direct:start"); + + from("direct:start") + .removeHeaders("CamelHttp*") + .to(getProducerUri()); + + from(getTestServerUri()) + .setHeader(Exchange.HTTP_RESPONSE_CODE, constant(500)); + + from("direct:slow") + .delay(600) + .syncDelayed() + .setBody().constant(SLOW_SERVICE_RESPONSE); + } + }; + } +} diff --git a/components/camel-vertx/camel-vertx-http/src/test/java/org/apache/camel/component/vertx/http/VertxHttpTestSupport.java b/components/camel-vertx/camel-vertx-http/src/test/java/org/apache/camel/component/vertx/http/VertxHttpTestSupport.java index 9eee1154930..50d255fdb5a 100644 --- a/components/camel-vertx/camel-vertx-http/src/test/java/org/apache/camel/component/vertx/http/VertxHttpTestSupport.java +++ b/components/camel-vertx/camel-vertx-http/src/test/java/org/apache/camel/component/vertx/http/VertxHttpTestSupport.java @@ -16,6 +16,14 @@ */ package org.apache.camel.component.vertx.http; +import java.util.concurrent.TimeUnit; + +import io.vertx.core.Handler; +import io.vertx.core.Vertx; +import io.vertx.core.VertxException; +import io.vertx.core.VertxOptions; +import io.vertx.core.impl.VertxInternal; +import io.vertx.core.impl.btc.BlockedThreadEvent; import org.apache.camel.test.AvailablePortFinder; import org.apache.camel.test.junit5.CamelTestSupport; @@ -38,4 +46,34 @@ public class VertxHttpTestSupport extends CamelTestSupport { protected int getPort() { return port; } + + protected Vertx createVertxWithThreadBlockedHandler(Handler<BlockedThreadEvent> handler) { + VertxOptions vertxOptions = new VertxOptions(); + vertxOptions.setMaxEventLoopExecuteTime(500); + vertxOptions.setMaxEventLoopExecuteTimeUnit(TimeUnit.MILLISECONDS); + vertxOptions.setBlockedThreadCheckInterval(10); + vertxOptions.setBlockedThreadCheckIntervalUnit(TimeUnit.MILLISECONDS); + Vertx vertx = Vertx.vertx(vertxOptions); + ((VertxInternal) vertx).blockedThreadChecker().setThreadBlockedHandler(handler); + return vertx; + } + + static final class BlockedThreadReporter implements Handler<BlockedThreadEvent> { + private volatile boolean eventLoopBlocked; + + @Override + public void handle(BlockedThreadEvent event) { + VertxException stackTrace = new VertxException("Thread blocked"); + stackTrace.setStackTrace(event.thread().getStackTrace()); + eventLoopBlocked = true; + } + + public boolean isEventLoopBlocked() { + return eventLoopBlocked; + } + + public void reset() { + eventLoopBlocked = false; + } + } }