This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-13636
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 518da60891cb33edae76746e3311e1ffe43e547c
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Wed Jun 12 15:35:37 2019 +0200

    CAMEL-13636: camel3 - SPI for ReactiveHelper so we can plugin different 
reactive engines
---
 .../camel/impl/engine/AbstractCamelContext.java    |  2 +-
 .../camel/impl/engine/DefaultReactiveExecutor.java | 47 ++++++++++++++-
 .../camel/impl/MultipleLifecycleStrategyTest.java  |  2 +-
 core/camel-management-impl/pom.xml                 | 16 +++++
 .../ManagedDefaultReactiveExecutorTest.java        | 70 ++++++++++++++++++++++
 .../management/ManagedNonManagedServiceTest.java   |  4 +-
 ...edProducerRouteAddRemoveRegisterAlwaysTest.java |  2 +-
 .../management/ManagedRouteAddRemoveTest.java      |  2 +-
 8 files changed, 137 insertions(+), 8 deletions(-)

diff --git 
a/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
 
b/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
index 20eadae..de47b59 100644
--- 
a/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
+++ 
b/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
@@ -3142,6 +3142,7 @@ public abstract class AbstractCamelContext extends 
ServiceSupport implements Ext
         getProducerServicePool();
         getPollingConsumerServicePool();
         getRestRegistryFactory();
+        getReactiveExecutor();
 
         if (isTypeConverterStatisticsEnabled() != null) {
             
getTypeConverterRegistry().getStatistics().setStatisticsEnabled(isTypeConverterStatisticsEnabled());
@@ -3170,7 +3171,6 @@ public abstract class AbstractCamelContext extends 
ServiceSupport implements Ext
         getBeanProxyFactory();
         getBeanProcessorFactory();
         getBeanPostProcessor();
-        getReactiveExecutor();
     }
 
     /**
diff --git 
a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
 
b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
index e094999..350e189 100644
--- 
a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
+++ 
b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
@@ -17,9 +17,14 @@
 package org.apache.camel.impl.engine;
 
 import java.util.LinkedList;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.StaticService;
+import org.apache.camel.api.management.ManagedAttribute;
+import org.apache.camel.api.management.ManagedResource;
 import org.apache.camel.spi.ReactiveExecutor;
 import org.apache.camel.support.service.ServiceSupport;
 import org.slf4j.Logger;
@@ -28,13 +33,23 @@ import org.slf4j.LoggerFactory;
 /**
  * Default {@link ReactiveExecutor}.
  */
+@ManagedResource(description = "Managed ReactiveExecutor")
 public class DefaultReactiveExecutor extends ServiceSupport implements 
ReactiveExecutor, StaticService {
 
-    // TODO: Add mbean info so we can get details
-
     private static final Logger LOG = 
LoggerFactory.getLogger(DefaultReactiveExecutor.class);
 
-    private final ThreadLocal<Worker> workers = 
ThreadLocal.withInitial(Worker::new);
+    private final ThreadLocal<Worker> workers = ThreadLocal.withInitial(new 
Supplier<Worker>() {
+        @Override
+        public Worker get() {
+            createdWorkers.incrementAndGet();
+            return new Worker(DefaultReactiveExecutor.this);
+        }
+    });
+
+    // use for statistics so we have insights at runtime
+    private final AtomicInteger createdWorkers = new AtomicInteger();
+    private final AtomicInteger runningWorkers = new AtomicInteger();
+    private final AtomicLong pendingTasks = new AtomicLong();
 
     @Override
     public void scheduleMain(Runnable runnable, String description) {
@@ -65,6 +80,21 @@ public class DefaultReactiveExecutor extends ServiceSupport 
implements ReactiveE
         return workers.get().executeFromQueue();
     }
 
+    @ManagedAttribute(description = "Number of created workers")
+    public int getCreatedWorkers() {
+        return createdWorkers.get();
+    }
+
+    @ManagedAttribute(description = "Number of running workers")
+    public int getRunningWorkers() {
+        return runningWorkers.get();
+    }
+
+    @ManagedAttribute(description = "Number of pending tasks")
+    public long getPendingTasks() {
+        return pendingTasks.get();
+    }
+
     @Override
     public void callback(AsyncCallback callback) {
         schedule(new Runnable() {
@@ -104,10 +134,15 @@ public class DefaultReactiveExecutor extends 
ServiceSupport implements ReactiveE
 
     private static class Worker {
 
+        private final DefaultReactiveExecutor executor;
         private volatile LinkedList<Runnable> queue = new LinkedList<>();
         private volatile LinkedList<LinkedList<Runnable>> back;
         private volatile boolean running;
 
+        public Worker(DefaultReactiveExecutor executor) {
+            this.executor = executor;
+        }
+
         void schedule(Runnable runnable, boolean first, boolean main, boolean 
sync) {
             if (main) {
                 if (!queue.isEmpty()) {
@@ -120,11 +155,14 @@ public class DefaultReactiveExecutor extends 
ServiceSupport implements ReactiveE
             }
             if (first) {
                 queue.addFirst(runnable);
+                executor.pendingTasks.incrementAndGet();
             } else {
                 queue.addLast(runnable);
+                executor.pendingTasks.incrementAndGet();
             }
             if (!running || sync) {
                 running = true;
+                executor.runningWorkers.incrementAndGet();
 //                Thread thread = Thread.currentThread();
 //                String name = thread.getName();
                 try {
@@ -139,6 +177,7 @@ public class DefaultReactiveExecutor extends ServiceSupport 
implements ReactiveE
                             }
                         }
                         try {
+                            executor.pendingTasks.decrementAndGet();
 //                            thread.setName(name + " - " + polled.toString());
                             polled.run();
                         } catch (Throwable t) {
@@ -148,6 +187,7 @@ public class DefaultReactiveExecutor extends ServiceSupport 
implements ReactiveE
                 } finally {
 //                    thread.setName(name);
                     running = false;
+                    executor.runningWorkers.decrementAndGet();
                 }
             } else {
                 LOG.debug("Queuing reactive work: {}", runnable);
@@ -162,6 +202,7 @@ public class DefaultReactiveExecutor extends ServiceSupport 
implements ReactiveE
             Thread thread = Thread.currentThread();
             String name = thread.getName();
             try {
+                executor.pendingTasks.decrementAndGet();
                 thread.setName(name + " - " + polled.toString());
                 polled.run();
             } catch (Throwable t) {
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/impl/MultipleLifecycleStrategyTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/impl/MultipleLifecycleStrategyTest.java
index caffd55..4c0ffc8 100644
--- 
a/core/camel-core/src/test/java/org/apache/camel/impl/MultipleLifecycleStrategyTest.java
+++ 
b/core/camel-core/src/test/java/org/apache/camel/impl/MultipleLifecycleStrategyTest.java
@@ -52,7 +52,7 @@ public class MultipleLifecycleStrategyTest extends 
TestSupport {
         List<String> expectedEvents = Arrays.asList("onContextStart",
             "onServiceAdd", "onServiceAdd", "onServiceAdd", "onServiceAdd", 
"onServiceAdd", "onServiceAdd", "onServiceAdd",
             "onServiceAdd", "onServiceAdd", "onServiceAdd", "onServiceAdd", 
"onServiceAdd", "onServiceAdd", "onServiceAdd",
-            "onComponentAdd", "onEndpointAdd", "onComponentRemove", 
"onContextStop");
+            "onServiceAdd", "onComponentAdd", "onEndpointAdd", 
"onComponentRemove", "onContextStop");
         
         assertEquals(expectedEvents, dummy1.getEvents());
         assertEquals(expectedEvents, dummy2.getEvents());
diff --git a/core/camel-management-impl/pom.xml 
b/core/camel-management-impl/pom.xml
index 3d6fef6..58aca2f 100644
--- a/core/camel-management-impl/pom.xml
+++ b/core/camel-management-impl/pom.xml
@@ -109,6 +109,22 @@
             <artifactId>mockito-core</artifactId>
             <scope>test</scope>
         </dependency>
+        <!-- logging for testing -->
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-api</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-core</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-slf4j-impl</artifactId>
+            <scope>test</scope>
+        </dependency>
 
     </dependencies>
 
diff --git 
a/core/camel-management-impl/src/test/java/org/apache/camel/management/ManagedDefaultReactiveExecutorTest.java
 
b/core/camel-management-impl/src/test/java/org/apache/camel/management/ManagedDefaultReactiveExecutorTest.java
new file mode 100644
index 0000000..a6cb317
--- /dev/null
+++ 
b/core/camel-management-impl/src/test/java/org/apache/camel/management/ManagedDefaultReactiveExecutorTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.management;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.Test;
+
+public class ManagedDefaultReactiveExecutorTest extends ManagementTestSupport {
+
+    @Test
+    public void testReactiveExecutor() throws Exception {
+        getMockEndpoint("mock:result").expectedMessageCount(1);
+
+        template.sendBody("seda:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("seda:start")
+                    .to("log:foo")
+                    .process(new Processor() {
+                        @Override
+                        public void process(Exchange exchange) throws 
Exception {
+                            // check mbeans
+                            MBeanServer mbeanServer = getMBeanServer();
+
+                            ObjectName on = 
ObjectName.getInstance("org.apache.camel:context=camel-1,type=services,name=DefaultReactiveExecutor");
+                            assertTrue("Should be registered", 
mbeanServer.isRegistered(on));
+
+                            // should be 1 running
+                            Integer running = (Integer) 
mbeanServer.getAttribute(on, "RunningWorkers");
+                            assertEquals(1, running.intValue());
+
+                            // should be 0 pending
+                            Long pending = (Long) mbeanServer.getAttribute(on, 
"PendingTasks");
+                            assertEquals(0, pending.intValue());
+                        }
+                    })
+                    .to("log:bar")
+                    .to("mock:result");
+            }
+        };
+    }
+
+
+}
diff --git 
a/core/camel-management-impl/src/test/java/org/apache/camel/management/ManagedNonManagedServiceTest.java
 
b/core/camel-management-impl/src/test/java/org/apache/camel/management/ManagedNonManagedServiceTest.java
index 6962165..3a249b5 100644
--- 
a/core/camel-management-impl/src/test/java/org/apache/camel/management/ManagedNonManagedServiceTest.java
+++ 
b/core/camel-management-impl/src/test/java/org/apache/camel/management/ManagedNonManagedServiceTest.java
@@ -29,7 +29,7 @@ import org.junit.Test;
 
 public class ManagedNonManagedServiceTest extends ManagementTestSupport {
 
-    private static final int SERVICES = 11;
+    private static final int SERVICES = 12;
 
     @Test
     public void testService() throws Exception {
@@ -38,6 +38,8 @@ public class ManagedNonManagedServiceTest extends 
ManagementTestSupport {
             return;
         }
 
+        template.sendBody("direct:start", "Hello World");
+
         // must enable always as CamelContext has been started
         // and we add the service manually below
         
context.getManagementStrategy().getManagementAgent().setRegisterAlways(true);
diff --git 
a/core/camel-management-impl/src/test/java/org/apache/camel/management/ManagedProducerRouteAddRemoveRegisterAlwaysTest.java
 
b/core/camel-management-impl/src/test/java/org/apache/camel/management/ManagedProducerRouteAddRemoveRegisterAlwaysTest.java
index 6c7cd92..4bb974c 100644
--- 
a/core/camel-management-impl/src/test/java/org/apache/camel/management/ManagedProducerRouteAddRemoveRegisterAlwaysTest.java
+++ 
b/core/camel-management-impl/src/test/java/org/apache/camel/management/ManagedProducerRouteAddRemoveRegisterAlwaysTest.java
@@ -28,7 +28,7 @@ import org.junit.Test;
 
 public class ManagedProducerRouteAddRemoveRegisterAlwaysTest extends 
ManagementTestSupport {
 
-    private int services = 11;
+    private int services = 12;
 
     @Override
     protected CamelContext createCamelContext() throws Exception {
diff --git 
a/core/camel-management-impl/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java
 
b/core/camel-management-impl/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java
index fff6e2d..339ec5f 100644
--- 
a/core/camel-management-impl/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java
+++ 
b/core/camel-management-impl/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java
@@ -33,7 +33,7 @@ import org.junit.Test;
  */
 public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
     
-    private static final int SERVICES = 11;
+    private static final int SERVICES = 12;
 
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {

Reply via email to