This is an automated email from the ASF dual-hosted git repository.

jamesnetherton pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 1c5e254ab311e12576c36d5a0cd991d9ab7fb33a
Author: James Netherton <jamesnether...@gmail.com>
AuthorDate: Thu Mar 2 11:38:37 2023 +0000

    CAMEL-19109: Avoid blocking Vert.x event loop in vertx-websocket consumers
---
 .../websocket/VertxWebsocketClientConsumer.java    | 35 +++++++++--
 .../vertx/websocket/VertxWebsocketConsumer.java    | 45 +++++++++-----
 .../VertxWebSocketSlowClientConsumerTest.java      | 68 ++++++++++++++++++++++
 .../websocket/VertxWebSocketSlowConsumerTest.java  | 68 ++++++++++++++++++++++
 .../vertx/websocket/VertxWebSocketTestSupport.java | 35 +++++++++++
 5 files changed, 234 insertions(+), 17 deletions(-)

diff --git 
a/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketClientConsumer.java
 
b/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketClientConsumer.java
index d00e5bd67c2..42aece3170d 100644
--- 
a/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketClientConsumer.java
+++ 
b/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketClientConsumer.java
@@ -21,7 +21,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 import io.vertx.core.Vertx;
 import io.vertx.core.http.WebSocket;
 import io.vertx.core.net.impl.ConnectionBase;
-import org.apache.camel.AsyncCallback;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
@@ -98,10 +97,38 @@ public class VertxWebsocketClientConsumer extends 
DefaultConsumer {
     }
 
     protected void handleResult(Object result) {
-        Exchange exchange = createExchange(true);
-        AsyncCallback cb = defaultConsumerCallback(exchange, true);
+        Exchange exchange = createExchange(false);
         Message message = exchange.getMessage();
         message.setBody(result);
-        getAsyncProcessor().process(exchange, cb);
+        processExchange(exchange);
+    }
+
+    protected void processExchange(Exchange exchange) {
+        Vertx vertx = getEndpoint().getVertx();
+        vertx.executeBlocking(
+                promise -> {
+                    try {
+                        createUoW(exchange);
+                    } catch (Exception e) {
+                        promise.fail(e);
+                        return;
+                    }
+
+                    getAsyncProcessor().process(exchange, c -> {
+                        promise.complete();
+                    });
+                },
+                false,
+                result -> {
+                    try {
+                        if (result.failed()) {
+                            Throwable cause = result.cause();
+                            getExceptionHandler().handleException(cause);
+                        }
+                    } finally {
+                        doneUoW(exchange);
+                        releaseExchange(exchange, false);
+                    }
+                });
     }
 }
diff --git 
a/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketConsumer.java
 
b/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketConsumer.java
index 39cd8de9db0..d505029d671 100644
--- 
a/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketConsumer.java
+++ 
b/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketConsumer.java
@@ -22,7 +22,6 @@ import io.vertx.core.http.ServerWebSocket;
 import io.vertx.core.net.SocketAddress;
 import io.vertx.core.net.impl.ConnectionBase;
 import io.vertx.ext.web.RoutingContext;
-import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
@@ -65,10 +64,7 @@ public class VertxWebsocketConsumer extends DefaultConsumer {
         Exchange exchange = createExchange(true);
         exchange.getMessage().setBody(message);
         populateExchangeHeaders(exchange, connectionKey, remote, 
routingContext, VertxWebsocketEvent.MESSAGE);
-
-        // use default consumer callback
-        AsyncCallback cb = defaultConsumerCallback(exchange, true);
-        getAsyncProcessor().process(exchange, cb);
+        processExchange(exchange, routingContext);
     }
 
     public void onException(String connectionKey, Throwable cause, 
SocketAddress remote, RoutingContext routingContext) {
@@ -88,19 +84,13 @@ public class VertxWebsocketConsumer extends DefaultConsumer 
{
         Exchange exchange = createExchange(true);
         populateExchangeHeaders(exchange, connectionKey, remote, 
routingContext, VertxWebsocketEvent.OPEN);
         exchange.getMessage().setBody(webSocket);
-
-        // use default consumer callback
-        AsyncCallback cb = defaultConsumerCallback(exchange, true);
-        getAsyncProcessor().process(exchange, cb);
+        processExchange(exchange, routingContext);
     }
 
     public void onClose(String connectionKey, SocketAddress remote, 
RoutingContext routingContext) {
         Exchange exchange = createExchange(true);
         populateExchangeHeaders(exchange, connectionKey, remote, 
routingContext, VertxWebsocketEvent.CLOSE);
-
-        // use default consumer callback
-        AsyncCallback cb = defaultConsumerCallback(exchange, true);
-        getAsyncProcessor().process(exchange, cb);
+        processExchange(exchange, routingContext);
     }
 
     protected void populateExchangeHeaders(
@@ -116,4 +106,33 @@ public class VertxWebsocketConsumer extends 
DefaultConsumer {
         routingContext.pathParams()
                 .forEach((name, value) -> 
VertxWebsocketHelper.appendHeader(headers, name, value));
     }
+
+    protected void processExchange(Exchange exchange, RoutingContext 
routingContext) {
+        routingContext.vertx().executeBlocking(
+                promise -> {
+                    try {
+                        createUoW(exchange);
+                    } catch (Exception e) {
+                        promise.fail(e);
+                        return;
+                    }
+
+                    getAsyncProcessor().process(exchange, c -> {
+                        promise.complete();
+                    });
+                },
+                false,
+                result -> {
+                    try {
+                        if (result.failed()) {
+                            Throwable cause = result.cause();
+                            getExceptionHandler().handleException(cause);
+                            routingContext.fail(cause);
+                        }
+                    } finally {
+                        doneUoW(exchange);
+                        releaseExchange(exchange, false);
+                    }
+                });
+    }
 }
diff --git 
a/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebSocketSlowClientConsumerTest.java
 
b/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebSocketSlowClientConsumerTest.java
new file mode 100644
index 00000000000..fa6997a6ddf
--- /dev/null
+++ 
b/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebSocketSlowClientConsumerTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.vertx.websocket;
+
+import io.vertx.core.Vertx;
+import org.apache.camel.BindToRegistry;
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+public class VertxWebSocketSlowClientConsumerTest extends 
VertxWebSocketTestSupport {
+    private static final String MESSAGE_BODY = "Hello World";
+    private final BlockedThreadReporter reporter = new BlockedThreadReporter();
+
+    @AfterEach
+    public void afterEach() {
+        reporter.reset();
+    }
+
+    @BindToRegistry
+    public Vertx createVertx() {
+        return createVertxWithThreadBlockedHandler(reporter);
+    }
+
+    @Test
+    void slowClientConsumerDoesNotBlockEventLoop() throws Exception {
+        MockEndpoint mockEndpoint = 
getMockEndpoint("mock:clientConsumerResult");
+        mockEndpoint.expectedBodiesReceived(MESSAGE_BODY);
+
+        template.sendBody("vertx-websocket:localhost:" + port + "/echo/slow", 
MESSAGE_BODY);
+
+        mockEndpoint.assertIsSatisfied();
+        assertFalse(reporter.isEventLoopBlocked(), "Expected Vert.x event loop 
to not be blocked");
+    }
+
+    @Override
+    protected RoutesBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                fromF("vertx-websocket:localhost:%d/echo/slow", port)
+                        
.toF("vertx-websocket:localhost:%d/echo/slow?sendToAll=true", port);
+
+                
fromF("vertx-websocket:localhost:%d/echo/slow?consumeAsClient=true", port)
+                        .delay(600).syncDelayed()
+                        .to("mock:clientConsumerResult");
+            }
+        };
+    }
+}
diff --git 
a/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebSocketSlowConsumerTest.java
 
b/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebSocketSlowConsumerTest.java
new file mode 100644
index 00000000000..de982c6d61b
--- /dev/null
+++ 
b/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebSocketSlowConsumerTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.vertx.websocket;
+
+import io.vertx.core.Vertx;
+import org.apache.camel.BindToRegistry;
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+public class VertxWebSocketSlowConsumerTest extends VertxWebSocketTestSupport {
+    private static final String MESSAGE_BODY = "Hello World";
+    private final BlockedThreadReporter reporter = new BlockedThreadReporter();
+
+    @AfterEach
+    public void afterEach() {
+        reporter.reset();
+    }
+
+    @BindToRegistry
+    public Vertx createVertx() {
+        return createVertxWithThreadBlockedHandler(reporter);
+    }
+
+    @Test
+    void slowConsumerDoesNotBlockEventLoop() throws Exception {
+        MockEndpoint mockEndpoint = getMockEndpoint("mock:result");
+        mockEndpoint.expectedBodiesReceived(MESSAGE_BODY);
+
+        template.requestBody("direct:start", MESSAGE_BODY);
+
+        mockEndpoint.assertIsSatisfied();
+        assertFalse(reporter.isEventLoopBlocked(), "Expected Vert.x event loop 
to not be blocked");
+    }
+
+    @Override
+    protected RoutesBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                        .toF("vertx-websocket:localhost:%d/slow", port);
+
+                fromF("vertx-websocket:localhost:%d/slow", port)
+                        .delay(600).syncDelayed()
+                        .to("mock:result");
+            }
+        };
+    }
+}
diff --git 
a/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebSocketTestSupport.java
 
b/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebSocketTestSupport.java
index ac5ff4e0882..f695197d6e5 100644
--- 
a/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebSocketTestSupport.java
+++ 
b/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebSocketTestSupport.java
@@ -24,8 +24,12 @@ import java.util.function.Consumer;
 
 import io.vertx.core.Handler;
 import io.vertx.core.Vertx;
+import io.vertx.core.VertxException;
+import io.vertx.core.VertxOptions;
 import io.vertx.core.http.HttpClient;
 import io.vertx.core.http.WebSocket;
+import io.vertx.core.impl.VertxInternal;
+import io.vertx.core.impl.btc.BlockedThreadEvent;
 import io.vertx.ext.web.Route;
 import io.vertx.ext.web.Router;
 import io.vertx.ext.web.RoutingContext;
@@ -79,4 +83,35 @@ public class VertxWebSocketTestSupport extends 
CamelTestSupport {
         route.handler(handler);
         return router;
     }
+
+    public Vertx 
createVertxWithThreadBlockedHandler(Handler<BlockedThreadEvent> handler) {
+        VertxOptions vertxOptions = new VertxOptions();
+        vertxOptions.setMaxEventLoopExecuteTime(500);
+        vertxOptions.setMaxEventLoopExecuteTimeUnit(TimeUnit.MILLISECONDS);
+        vertxOptions.setBlockedThreadCheckInterval(10);
+        vertxOptions.setBlockedThreadCheckIntervalUnit(TimeUnit.MILLISECONDS);
+        Vertx vertx = Vertx.vertx(vertxOptions);
+        ((VertxInternal) 
vertx).blockedThreadChecker().setThreadBlockedHandler(handler);
+        return vertx;
+    }
+
+    static class BlockedThreadReporter implements Handler<BlockedThreadEvent> {
+        private boolean eventLoopBlocked;
+
+        @Override
+        public void handle(BlockedThreadEvent event) {
+            VertxException stackTrace = new VertxException("Thread blocked");
+            stackTrace.setStackTrace(event.thread().getStackTrace());
+            stackTrace.printStackTrace();
+            eventLoopBlocked = true;
+        }
+
+        public boolean isEventLoopBlocked() {
+            return eventLoopBlocked;
+        }
+
+        public void reset() {
+            eventLoopBlocked = false;
+        }
+    }
 }

Reply via email to