This is an automated email from the ASF dual-hosted git repository. jamesnetherton pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit 1c5e254ab311e12576c36d5a0cd991d9ab7fb33a Author: James Netherton <jamesnether...@gmail.com> AuthorDate: Thu Mar 2 11:38:37 2023 +0000 CAMEL-19109: Avoid blocking Vert.x event loop in vertx-websocket consumers --- .../websocket/VertxWebsocketClientConsumer.java | 35 +++++++++-- .../vertx/websocket/VertxWebsocketConsumer.java | 45 +++++++++----- .../VertxWebSocketSlowClientConsumerTest.java | 68 ++++++++++++++++++++++ .../websocket/VertxWebSocketSlowConsumerTest.java | 68 ++++++++++++++++++++++ .../vertx/websocket/VertxWebSocketTestSupport.java | 35 +++++++++++ 5 files changed, 234 insertions(+), 17 deletions(-) diff --git a/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketClientConsumer.java b/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketClientConsumer.java index d00e5bd67c2..42aece3170d 100644 --- a/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketClientConsumer.java +++ b/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketClientConsumer.java @@ -21,7 +21,6 @@ import java.util.concurrent.atomic.AtomicInteger; import io.vertx.core.Vertx; import io.vertx.core.http.WebSocket; import io.vertx.core.net.impl.ConnectionBase; -import org.apache.camel.AsyncCallback; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.Message; @@ -98,10 +97,38 @@ public class VertxWebsocketClientConsumer extends DefaultConsumer { } protected void handleResult(Object result) { - Exchange exchange = createExchange(true); - AsyncCallback cb = defaultConsumerCallback(exchange, true); + Exchange exchange = createExchange(false); Message message = exchange.getMessage(); message.setBody(result); - getAsyncProcessor().process(exchange, cb); + processExchange(exchange); + } + + protected void processExchange(Exchange exchange) { + Vertx vertx = getEndpoint().getVertx(); + vertx.executeBlocking( + promise -> { + try { + createUoW(exchange); + } catch (Exception e) { + promise.fail(e); + return; + } + + getAsyncProcessor().process(exchange, c -> { + promise.complete(); + }); + }, + false, + result -> { + try { + if (result.failed()) { + Throwable cause = result.cause(); + getExceptionHandler().handleException(cause); + } + } finally { + doneUoW(exchange); + releaseExchange(exchange, false); + } + }); } } diff --git a/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketConsumer.java b/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketConsumer.java index 39cd8de9db0..d505029d671 100644 --- a/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketConsumer.java +++ b/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketConsumer.java @@ -22,7 +22,6 @@ import io.vertx.core.http.ServerWebSocket; import io.vertx.core.net.SocketAddress; import io.vertx.core.net.impl.ConnectionBase; import io.vertx.ext.web.RoutingContext; -import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.Processor; @@ -65,10 +64,7 @@ public class VertxWebsocketConsumer extends DefaultConsumer { Exchange exchange = createExchange(true); exchange.getMessage().setBody(message); populateExchangeHeaders(exchange, connectionKey, remote, routingContext, VertxWebsocketEvent.MESSAGE); - - // use default consumer callback - AsyncCallback cb = defaultConsumerCallback(exchange, true); - getAsyncProcessor().process(exchange, cb); + processExchange(exchange, routingContext); } public void onException(String connectionKey, Throwable cause, SocketAddress remote, RoutingContext routingContext) { @@ -88,19 +84,13 @@ public class VertxWebsocketConsumer extends DefaultConsumer { Exchange exchange = createExchange(true); populateExchangeHeaders(exchange, connectionKey, remote, routingContext, VertxWebsocketEvent.OPEN); exchange.getMessage().setBody(webSocket); - - // use default consumer callback - AsyncCallback cb = defaultConsumerCallback(exchange, true); - getAsyncProcessor().process(exchange, cb); + processExchange(exchange, routingContext); } public void onClose(String connectionKey, SocketAddress remote, RoutingContext routingContext) { Exchange exchange = createExchange(true); populateExchangeHeaders(exchange, connectionKey, remote, routingContext, VertxWebsocketEvent.CLOSE); - - // use default consumer callback - AsyncCallback cb = defaultConsumerCallback(exchange, true); - getAsyncProcessor().process(exchange, cb); + processExchange(exchange, routingContext); } protected void populateExchangeHeaders( @@ -116,4 +106,33 @@ public class VertxWebsocketConsumer extends DefaultConsumer { routingContext.pathParams() .forEach((name, value) -> VertxWebsocketHelper.appendHeader(headers, name, value)); } + + protected void processExchange(Exchange exchange, RoutingContext routingContext) { + routingContext.vertx().executeBlocking( + promise -> { + try { + createUoW(exchange); + } catch (Exception e) { + promise.fail(e); + return; + } + + getAsyncProcessor().process(exchange, c -> { + promise.complete(); + }); + }, + false, + result -> { + try { + if (result.failed()) { + Throwable cause = result.cause(); + getExceptionHandler().handleException(cause); + routingContext.fail(cause); + } + } finally { + doneUoW(exchange); + releaseExchange(exchange, false); + } + }); + } } diff --git a/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebSocketSlowClientConsumerTest.java b/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebSocketSlowClientConsumerTest.java new file mode 100644 index 00000000000..fa6997a6ddf --- /dev/null +++ b/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebSocketSlowClientConsumerTest.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.vertx.websocket; + +import io.vertx.core.Vertx; +import org.apache.camel.BindToRegistry; +import org.apache.camel.RoutesBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertFalse; + +public class VertxWebSocketSlowClientConsumerTest extends VertxWebSocketTestSupport { + private static final String MESSAGE_BODY = "Hello World"; + private final BlockedThreadReporter reporter = new BlockedThreadReporter(); + + @AfterEach + public void afterEach() { + reporter.reset(); + } + + @BindToRegistry + public Vertx createVertx() { + return createVertxWithThreadBlockedHandler(reporter); + } + + @Test + void slowClientConsumerDoesNotBlockEventLoop() throws Exception { + MockEndpoint mockEndpoint = getMockEndpoint("mock:clientConsumerResult"); + mockEndpoint.expectedBodiesReceived(MESSAGE_BODY); + + template.sendBody("vertx-websocket:localhost:" + port + "/echo/slow", MESSAGE_BODY); + + mockEndpoint.assertIsSatisfied(); + assertFalse(reporter.isEventLoopBlocked(), "Expected Vert.x event loop to not be blocked"); + } + + @Override + protected RoutesBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + fromF("vertx-websocket:localhost:%d/echo/slow", port) + .toF("vertx-websocket:localhost:%d/echo/slow?sendToAll=true", port); + + fromF("vertx-websocket:localhost:%d/echo/slow?consumeAsClient=true", port) + .delay(600).syncDelayed() + .to("mock:clientConsumerResult"); + } + }; + } +} diff --git a/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebSocketSlowConsumerTest.java b/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebSocketSlowConsumerTest.java new file mode 100644 index 00000000000..de982c6d61b --- /dev/null +++ b/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebSocketSlowConsumerTest.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.vertx.websocket; + +import io.vertx.core.Vertx; +import org.apache.camel.BindToRegistry; +import org.apache.camel.RoutesBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertFalse; + +public class VertxWebSocketSlowConsumerTest extends VertxWebSocketTestSupport { + private static final String MESSAGE_BODY = "Hello World"; + private final BlockedThreadReporter reporter = new BlockedThreadReporter(); + + @AfterEach + public void afterEach() { + reporter.reset(); + } + + @BindToRegistry + public Vertx createVertx() { + return createVertxWithThreadBlockedHandler(reporter); + } + + @Test + void slowConsumerDoesNotBlockEventLoop() throws Exception { + MockEndpoint mockEndpoint = getMockEndpoint("mock:result"); + mockEndpoint.expectedBodiesReceived(MESSAGE_BODY); + + template.requestBody("direct:start", MESSAGE_BODY); + + mockEndpoint.assertIsSatisfied(); + assertFalse(reporter.isEventLoopBlocked(), "Expected Vert.x event loop to not be blocked"); + } + + @Override + protected RoutesBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start") + .toF("vertx-websocket:localhost:%d/slow", port); + + fromF("vertx-websocket:localhost:%d/slow", port) + .delay(600).syncDelayed() + .to("mock:result"); + } + }; + } +} diff --git a/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebSocketTestSupport.java b/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebSocketTestSupport.java index ac5ff4e0882..f695197d6e5 100644 --- a/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebSocketTestSupport.java +++ b/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebSocketTestSupport.java @@ -24,8 +24,12 @@ import java.util.function.Consumer; import io.vertx.core.Handler; import io.vertx.core.Vertx; +import io.vertx.core.VertxException; +import io.vertx.core.VertxOptions; import io.vertx.core.http.HttpClient; import io.vertx.core.http.WebSocket; +import io.vertx.core.impl.VertxInternal; +import io.vertx.core.impl.btc.BlockedThreadEvent; import io.vertx.ext.web.Route; import io.vertx.ext.web.Router; import io.vertx.ext.web.RoutingContext; @@ -79,4 +83,35 @@ public class VertxWebSocketTestSupport extends CamelTestSupport { route.handler(handler); return router; } + + public Vertx createVertxWithThreadBlockedHandler(Handler<BlockedThreadEvent> handler) { + VertxOptions vertxOptions = new VertxOptions(); + vertxOptions.setMaxEventLoopExecuteTime(500); + vertxOptions.setMaxEventLoopExecuteTimeUnit(TimeUnit.MILLISECONDS); + vertxOptions.setBlockedThreadCheckInterval(10); + vertxOptions.setBlockedThreadCheckIntervalUnit(TimeUnit.MILLISECONDS); + Vertx vertx = Vertx.vertx(vertxOptions); + ((VertxInternal) vertx).blockedThreadChecker().setThreadBlockedHandler(handler); + return vertx; + } + + static class BlockedThreadReporter implements Handler<BlockedThreadEvent> { + private boolean eventLoopBlocked; + + @Override + public void handle(BlockedThreadEvent event) { + VertxException stackTrace = new VertxException("Thread blocked"); + stackTrace.setStackTrace(event.thread().getStackTrace()); + stackTrace.printStackTrace(); + eventLoopBlocked = true; + } + + public boolean isEventLoopBlocked() { + return eventLoopBlocked; + } + + public void reset() { + eventLoopBlocked = false; + } + } }