This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch CAMEL-15197 in repository https://gitbox.apache.org/repos/asf/camel.git
commit c32c09022fbb430647ab092886543f8d642883d0 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Sat Jun 27 08:11:03 2020 +0200 CAMEL-15197: Thread pool can use vertx executor to reuse its threading model. WIP. --- .../reactive/vertx/VertXThreadPoolFactory.java | 128 +++++++++++++++++++++ .../apache/camel/reactive/SplitParallelTest.java | 76 ++++++++++++ .../src/test/resources/log4j2.properties | 4 +- 3 files changed, 206 insertions(+), 2 deletions(-) diff --git a/components/camel-reactive-executor-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXThreadPoolFactory.java b/components/camel-reactive-executor-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXThreadPoolFactory.java new file mode 100644 index 0000000..c592780 --- /dev/null +++ b/components/camel-reactive-executor-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXThreadPoolFactory.java @@ -0,0 +1,128 @@ +package org.apache.camel.reactive.vertx; + +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import io.vertx.core.Vertx; +import org.apache.camel.Experimental; +import org.apache.camel.spi.ThreadPoolFactory; +import org.apache.camel.spi.ThreadPoolProfile; +import org.apache.camel.support.DefaultThreadPoolFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Experimental +public class VertXThreadPoolFactory extends DefaultThreadPoolFactory implements ThreadPoolFactory { + + private static final Logger LOG = LoggerFactory.getLogger(VertXReactiveExecutor.class); + + private final ExecutorService executorService = new VertXExecutorService(); + private Vertx vertx; + + public Vertx getVertx() { + return vertx; + } + + /** + * To use an existing instance of {@link Vertx} instead of creating a default instance. + */ + public void setVertx(Vertx vertx) { + this.vertx = vertx; + } + + @Override + public ExecutorService newThreadPool(ThreadPoolProfile profile, ThreadFactory threadFactory) { + return executorService; + } + + @Override + public ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { + return executorService; + } + + private class VertXExecutorService implements ExecutorService { + + @Override + public void shutdown() { + // noop + } + + @Override + public List<Runnable> shutdownNow() { + // noop + return null; + } + + @Override + public boolean isShutdown() { + return false; + } + + @Override + public boolean isTerminated() { + return false; + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return false; + } + + @Override + public <T> Future<T> submit(Callable<T> task) { + return null; + } + + @Override + public <T> Future<T> submit(Runnable task, T result) { + return null; + } + + @Override + public Future<?> submit(Runnable task) { + LOG.trace("submit: {}", task); + final CompletableFuture<?> answer = new CompletableFuture<>(); + // used by vertx + vertx.executeBlocking(future -> { + task.run(); + future.complete(); + }, res -> { answer.complete(null);} ); + return answer; + } + + @Override + public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { + return null; + } + + @Override + public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException { + return null; + } + + @Override + public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException { + return null; + } + + @Override + public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + return null; + } + + @Override + public void execute(Runnable command) { + // noop + } + } + +} diff --git a/components/camel-reactive-executor-vertx/src/test/java/org/apache/camel/reactive/SplitParallelTest.java b/components/camel-reactive-executor-vertx/src/test/java/org/apache/camel/reactive/SplitParallelTest.java new file mode 100644 index 0000000..2ed1254 --- /dev/null +++ b/components/camel-reactive-executor-vertx/src/test/java/org/apache/camel/reactive/SplitParallelTest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.reactive; + +import io.vertx.core.Vertx; +import org.apache.camel.CamelContext; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.impl.engine.AbstractCamelContext; +import org.apache.camel.reactive.vertx.VertXReactiveExecutor; +import org.apache.camel.reactive.vertx.VertXThreadPoolFactory; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +public class SplitParallelTest extends CamelTestSupport { + + private final Vertx vertx = Vertx.vertx(); + + @Override + protected CamelContext createCamelContext() throws Exception { + AbstractCamelContext context = new DefaultCamelContext(); + VertXReactiveExecutor re = new VertXReactiveExecutor(); + re.setVertx(vertx); + context.setReactiveExecutor(re); + + VertXThreadPoolFactory tpf = new VertXThreadPoolFactory(); + tpf.setVertx(vertx); + context.getExecutorServiceManager().setThreadPoolFactory(tpf); + + return context; + } + + @Test + public void testSplit() throws Exception { + getMockEndpoint("mock:result").expectedBodiesReceived("A,B,C,D,E,F,G,H,I,J"); + getMockEndpoint("mock:split").expectedBodiesReceivedInAnyOrder("A","B", "C", "D", "E", "F", "G", "H", "I", "J"); + + template.sendBody("direct:start", "A,B,C,D,E,F,G,H,I,J"); + + assertMockEndpointsSatisfied(); + + vertx.close(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start") + .to("log:foo") + .split(body()).parallelProcessing() + .delay(500).end() + .to("log:bar") + .to("mock:split") + .end() + .to("log:result") + .to("mock:result"); + } + }; + } +} diff --git a/components/camel-reactive-executor-vertx/src/test/resources/log4j2.properties b/components/camel-reactive-executor-vertx/src/test/resources/log4j2.properties index 2cebd6a..8daf4f4 100644 --- a/components/camel-reactive-executor-vertx/src/test/resources/log4j2.properties +++ b/components/camel-reactive-executor-vertx/src/test/resources/log4j2.properties @@ -27,5 +27,5 @@ appender.stdout.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n rootLogger.level = INFO -rootLogger.appenderRef.out.ref = out -#rootLogger.appenderRef.out.ref = stdout +#rootLogger.appenderRef.out.ref = out +rootLogger.appenderRef.out.ref = stdout