This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch camel-13636 in repository https://gitbox.apache.org/repos/asf/camel.git
commit 518da60891cb33edae76746e3311e1ffe43e547c Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Wed Jun 12 15:35:37 2019 +0200 CAMEL-13636: camel3 - SPI for ReactiveHelper so we can plugin different reactive engines --- .../camel/impl/engine/AbstractCamelContext.java | 2 +- .../camel/impl/engine/DefaultReactiveExecutor.java | 47 ++++++++++++++- .../camel/impl/MultipleLifecycleStrategyTest.java | 2 +- core/camel-management-impl/pom.xml | 16 +++++ .../ManagedDefaultReactiveExecutorTest.java | 70 ++++++++++++++++++++++ .../management/ManagedNonManagedServiceTest.java | 4 +- ...edProducerRouteAddRemoveRegisterAlwaysTest.java | 2 +- .../management/ManagedRouteAddRemoveTest.java | 2 +- 8 files changed, 137 insertions(+), 8 deletions(-) diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java index 20eadae..de47b59 100644 --- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java +++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java @@ -3142,6 +3142,7 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Ext getProducerServicePool(); getPollingConsumerServicePool(); getRestRegistryFactory(); + getReactiveExecutor(); if (isTypeConverterStatisticsEnabled() != null) { getTypeConverterRegistry().getStatistics().setStatisticsEnabled(isTypeConverterStatisticsEnabled()); @@ -3170,7 +3171,6 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Ext getBeanProxyFactory(); getBeanProcessorFactory(); getBeanPostProcessor(); - getReactiveExecutor(); } /** diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java index e094999..350e189 100644 --- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java +++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java @@ -17,9 +17,14 @@ package org.apache.camel.impl.engine; import java.util.LinkedList; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; import org.apache.camel.AsyncCallback; import org.apache.camel.StaticService; +import org.apache.camel.api.management.ManagedAttribute; +import org.apache.camel.api.management.ManagedResource; import org.apache.camel.spi.ReactiveExecutor; import org.apache.camel.support.service.ServiceSupport; import org.slf4j.Logger; @@ -28,13 +33,23 @@ import org.slf4j.LoggerFactory; /** * Default {@link ReactiveExecutor}. */ +@ManagedResource(description = "Managed ReactiveExecutor") public class DefaultReactiveExecutor extends ServiceSupport implements ReactiveExecutor, StaticService { - // TODO: Add mbean info so we can get details - private static final Logger LOG = LoggerFactory.getLogger(DefaultReactiveExecutor.class); - private final ThreadLocal<Worker> workers = ThreadLocal.withInitial(Worker::new); + private final ThreadLocal<Worker> workers = ThreadLocal.withInitial(new Supplier<Worker>() { + @Override + public Worker get() { + createdWorkers.incrementAndGet(); + return new Worker(DefaultReactiveExecutor.this); + } + }); + + // use for statistics so we have insights at runtime + private final AtomicInteger createdWorkers = new AtomicInteger(); + private final AtomicInteger runningWorkers = new AtomicInteger(); + private final AtomicLong pendingTasks = new AtomicLong(); @Override public void scheduleMain(Runnable runnable, String description) { @@ -65,6 +80,21 @@ public class DefaultReactiveExecutor extends ServiceSupport implements ReactiveE return workers.get().executeFromQueue(); } + @ManagedAttribute(description = "Number of created workers") + public int getCreatedWorkers() { + return createdWorkers.get(); + } + + @ManagedAttribute(description = "Number of running workers") + public int getRunningWorkers() { + return runningWorkers.get(); + } + + @ManagedAttribute(description = "Number of pending tasks") + public long getPendingTasks() { + return pendingTasks.get(); + } + @Override public void callback(AsyncCallback callback) { schedule(new Runnable() { @@ -104,10 +134,15 @@ public class DefaultReactiveExecutor extends ServiceSupport implements ReactiveE private static class Worker { + private final DefaultReactiveExecutor executor; private volatile LinkedList<Runnable> queue = new LinkedList<>(); private volatile LinkedList<LinkedList<Runnable>> back; private volatile boolean running; + public Worker(DefaultReactiveExecutor executor) { + this.executor = executor; + } + void schedule(Runnable runnable, boolean first, boolean main, boolean sync) { if (main) { if (!queue.isEmpty()) { @@ -120,11 +155,14 @@ public class DefaultReactiveExecutor extends ServiceSupport implements ReactiveE } if (first) { queue.addFirst(runnable); + executor.pendingTasks.incrementAndGet(); } else { queue.addLast(runnable); + executor.pendingTasks.incrementAndGet(); } if (!running || sync) { running = true; + executor.runningWorkers.incrementAndGet(); // Thread thread = Thread.currentThread(); // String name = thread.getName(); try { @@ -139,6 +177,7 @@ public class DefaultReactiveExecutor extends ServiceSupport implements ReactiveE } } try { + executor.pendingTasks.decrementAndGet(); // thread.setName(name + " - " + polled.toString()); polled.run(); } catch (Throwable t) { @@ -148,6 +187,7 @@ public class DefaultReactiveExecutor extends ServiceSupport implements ReactiveE } finally { // thread.setName(name); running = false; + executor.runningWorkers.decrementAndGet(); } } else { LOG.debug("Queuing reactive work: {}", runnable); @@ -162,6 +202,7 @@ public class DefaultReactiveExecutor extends ServiceSupport implements ReactiveE Thread thread = Thread.currentThread(); String name = thread.getName(); try { + executor.pendingTasks.decrementAndGet(); thread.setName(name + " - " + polled.toString()); polled.run(); } catch (Throwable t) { diff --git a/core/camel-core/src/test/java/org/apache/camel/impl/MultipleLifecycleStrategyTest.java b/core/camel-core/src/test/java/org/apache/camel/impl/MultipleLifecycleStrategyTest.java index caffd55..4c0ffc8 100644 --- a/core/camel-core/src/test/java/org/apache/camel/impl/MultipleLifecycleStrategyTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/impl/MultipleLifecycleStrategyTest.java @@ -52,7 +52,7 @@ public class MultipleLifecycleStrategyTest extends TestSupport { List<String> expectedEvents = Arrays.asList("onContextStart", "onServiceAdd", "onServiceAdd", "onServiceAdd", "onServiceAdd", "onServiceAdd", "onServiceAdd", "onServiceAdd", "onServiceAdd", "onServiceAdd", "onServiceAdd", "onServiceAdd", "onServiceAdd", "onServiceAdd", "onServiceAdd", - "onComponentAdd", "onEndpointAdd", "onComponentRemove", "onContextStop"); + "onServiceAdd", "onComponentAdd", "onEndpointAdd", "onComponentRemove", "onContextStop"); assertEquals(expectedEvents, dummy1.getEvents()); assertEquals(expectedEvents, dummy2.getEvents()); diff --git a/core/camel-management-impl/pom.xml b/core/camel-management-impl/pom.xml index 3d6fef6..58aca2f 100644 --- a/core/camel-management-impl/pom.xml +++ b/core/camel-management-impl/pom.xml @@ -109,6 +109,22 @@ <artifactId>mockito-core</artifactId> <scope>test</scope> </dependency> + <!-- logging for testing --> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-api</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-core</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-slf4j-impl</artifactId> + <scope>test</scope> + </dependency> </dependencies> diff --git a/core/camel-management-impl/src/test/java/org/apache/camel/management/ManagedDefaultReactiveExecutorTest.java b/core/camel-management-impl/src/test/java/org/apache/camel/management/ManagedDefaultReactiveExecutorTest.java new file mode 100644 index 0000000..a6cb317 --- /dev/null +++ b/core/camel-management-impl/src/test/java/org/apache/camel/management/ManagedDefaultReactiveExecutorTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.management; + +import javax.management.MBeanServer; +import javax.management.ObjectName; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.junit.Test; + +public class ManagedDefaultReactiveExecutorTest extends ManagementTestSupport { + + @Test + public void testReactiveExecutor() throws Exception { + getMockEndpoint("mock:result").expectedMessageCount(1); + + template.sendBody("seda:start", "Hello World"); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("seda:start") + .to("log:foo") + .process(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + // check mbeans + MBeanServer mbeanServer = getMBeanServer(); + + ObjectName on = ObjectName.getInstance("org.apache.camel:context=camel-1,type=services,name=DefaultReactiveExecutor"); + assertTrue("Should be registered", mbeanServer.isRegistered(on)); + + // should be 1 running + Integer running = (Integer) mbeanServer.getAttribute(on, "RunningWorkers"); + assertEquals(1, running.intValue()); + + // should be 0 pending + Long pending = (Long) mbeanServer.getAttribute(on, "PendingTasks"); + assertEquals(0, pending.intValue()); + } + }) + .to("log:bar") + .to("mock:result"); + } + }; + } + + +} diff --git a/core/camel-management-impl/src/test/java/org/apache/camel/management/ManagedNonManagedServiceTest.java b/core/camel-management-impl/src/test/java/org/apache/camel/management/ManagedNonManagedServiceTest.java index 6962165..3a249b5 100644 --- a/core/camel-management-impl/src/test/java/org/apache/camel/management/ManagedNonManagedServiceTest.java +++ b/core/camel-management-impl/src/test/java/org/apache/camel/management/ManagedNonManagedServiceTest.java @@ -29,7 +29,7 @@ import org.junit.Test; public class ManagedNonManagedServiceTest extends ManagementTestSupport { - private static final int SERVICES = 11; + private static final int SERVICES = 12; @Test public void testService() throws Exception { @@ -38,6 +38,8 @@ public class ManagedNonManagedServiceTest extends ManagementTestSupport { return; } + template.sendBody("direct:start", "Hello World"); + // must enable always as CamelContext has been started // and we add the service manually below context.getManagementStrategy().getManagementAgent().setRegisterAlways(true); diff --git a/core/camel-management-impl/src/test/java/org/apache/camel/management/ManagedProducerRouteAddRemoveRegisterAlwaysTest.java b/core/camel-management-impl/src/test/java/org/apache/camel/management/ManagedProducerRouteAddRemoveRegisterAlwaysTest.java index 6c7cd92..4bb974c 100644 --- a/core/camel-management-impl/src/test/java/org/apache/camel/management/ManagedProducerRouteAddRemoveRegisterAlwaysTest.java +++ b/core/camel-management-impl/src/test/java/org/apache/camel/management/ManagedProducerRouteAddRemoveRegisterAlwaysTest.java @@ -28,7 +28,7 @@ import org.junit.Test; public class ManagedProducerRouteAddRemoveRegisterAlwaysTest extends ManagementTestSupport { - private int services = 11; + private int services = 12; @Override protected CamelContext createCamelContext() throws Exception { diff --git a/core/camel-management-impl/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java b/core/camel-management-impl/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java index fff6e2d..339ec5f 100644 --- a/core/camel-management-impl/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java +++ b/core/camel-management-impl/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java @@ -33,7 +33,7 @@ import org.junit.Test; */ public class ManagedRouteAddRemoveTest extends ManagementTestSupport { - private static final int SERVICES = 11; + private static final int SERVICES = 12; @Override protected RouteBuilder createRouteBuilder() throws Exception {