CAMEL-8526: Add more EIP as specialized mbeans

Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/dfa458a2
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/dfa458a2
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/dfa458a2

Branch: refs/heads/master
Commit: dfa458a2e7b5ef3be40b17c22663d18c160ec97d
Parents: 6b0765b
Author: Claus Ibsen <davscl...@apache.org>
Authored: Wed Jul 22 07:55:16 2015 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Wed Jul 22 07:55:16 2015 +0200

----------------------------------------------------------------------
 .../management/mbean/ManagedFilterMBean.java    |  2 +-
 .../management/mbean/ManagedThreadsMBean.java   | 29 +++++++
 .../management/mbean/ManagedValidateMBean.java  | 26 ++++++
 .../DefaultManagementObjectStrategy.java        | 17 +++-
 .../camel/management/mbean/ManagedFilter.java   |  2 +-
 .../camel/management/mbean/ManagedThreads.java  | 51 ++++++++++++
 .../camel/management/mbean/ManagedValidate.java | 42 ++++++++++
 .../camel/management/ManagedFilterTest.java     |  2 +-
 .../camel/management/ManagedThreadsTest.java    | 86 ++++++++++++++++++++
 9 files changed, 252 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/dfa458a2/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedFilterMBean.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedFilterMBean.java
 
b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedFilterMBean.java
index 9dd63c3..71218d5 100644
--- 
a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedFilterMBean.java
+++ 
b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedFilterMBean.java
@@ -21,7 +21,7 @@ import org.apache.camel.api.management.ManagedAttribute;
 public interface ManagedFilterMBean extends ManagedProcessorMBean {
 
     @ManagedAttribute(description = "Predicate to determine if the message 
should be filtered or not")
-    String getFilter();
+    String getPredicate();
 
     @ManagedAttribute(description = "Gets the number of Exchanges that matched 
the filter predicate and therefore as filtered")
     Long getFilteredCount();

http://git-wip-us.apache.org/repos/asf/camel/blob/dfa458a2/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedThreadsMBean.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedThreadsMBean.java
 
b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedThreadsMBean.java
new file mode 100644
index 0000000..fe0c48b
--- /dev/null
+++ 
b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedThreadsMBean.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.api.management.mbean;
+
+import org.apache.camel.api.management.ManagedAttribute;
+
+public interface ManagedThreadsMBean extends ManagedProcessorMBean {
+
+    @ManagedAttribute(description = "Whether or not the caller should run the 
task when it was rejected by the thread pool")
+    Boolean isCallerRunsWhenRejected();
+
+    @ManagedAttribute(description = "How to handle tasks which cannot be 
accepted by the thread pool")
+    String getRejectedPolicy();
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/dfa458a2/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedValidateMBean.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedValidateMBean.java
 
b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedValidateMBean.java
new file mode 100644
index 0000000..d00e943
--- /dev/null
+++ 
b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedValidateMBean.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.api.management.mbean;
+
+import org.apache.camel.api.management.ManagedAttribute;
+
+public interface ManagedValidateMBean extends ManagedProcessorMBean {
+
+    @ManagedAttribute(description = "Predicate to determine if the message is 
valid or not")
+    String getPredicate();
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/dfa458a2/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java
 
b/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java
index 76c57bb..2a870b6 100644
--- 
a/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java
+++ 
b/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java
@@ -66,9 +66,11 @@ import org.apache.camel.management.mbean.ManagedSetProperty;
 import org.apache.camel.management.mbean.ManagedSplitter;
 import org.apache.camel.management.mbean.ManagedSuspendableRoute;
 import org.apache.camel.management.mbean.ManagedThreadPool;
+import org.apache.camel.management.mbean.ManagedThreads;
 import org.apache.camel.management.mbean.ManagedThrottler;
 import org.apache.camel.management.mbean.ManagedThroughputLogger;
 import org.apache.camel.management.mbean.ManagedTransformer;
+import org.apache.camel.management.mbean.ManagedValidate;
 import org.apache.camel.management.mbean.ManagedWireTapProcessor;
 import org.apache.camel.model.ModelCamelContext;
 import org.apache.camel.model.ProcessorDefinition;
@@ -94,12 +96,14 @@ import org.apache.camel.processor.SetHeaderProcessor;
 import org.apache.camel.processor.SetPropertyProcessor;
 import org.apache.camel.processor.Splitter;
 import org.apache.camel.processor.StreamResequencer;
+import org.apache.camel.processor.ThreadsProcessor;
 import org.apache.camel.processor.Throttler;
 import org.apache.camel.processor.ThroughputLogger;
 import org.apache.camel.processor.TransformProcessor;
 import org.apache.camel.processor.WireTapProcessor;
 import org.apache.camel.processor.aggregate.AggregateProcessor;
 import org.apache.camel.processor.idempotent.IdempotentConsumer;
+import org.apache.camel.processor.validation.PredicateValidatingProcessor;
 import org.apache.camel.spi.BrowsableEndpoint;
 import org.apache.camel.spi.EventNotifier;
 import org.apache.camel.spi.ManagementObjectStrategy;
@@ -206,13 +210,18 @@ public class DefaultManagementObjectStrategy implements 
ManagementObjectStrategy
                                                ProcessorDefinition<?> 
definition, Route route) {
         ManagedProcessor answer = null;
 
-        if (definition instanceof RecipientListDefinition || definition 
instanceof ThreadsDefinition) {
-            // special for RecipientListDefinition/ThreadsDefinition, as the 
processor is wrapped in a pipeline as last
+        if (definition instanceof RecipientListDefinition) {
+            // special for RecipientListDefinition, as the processor is 
wrapped in a pipeline as last
             Pipeline pipeline = (Pipeline) processor;
             Iterator<Processor> it = pipeline.getProcessors().iterator();
             while (it.hasNext()) {
                 processor = it.next();
             }
+        } else if (definition instanceof ThreadsDefinition) {
+            // special for ThreadsDefinition, as the processor is wrapped in a 
pipeline as first
+            Pipeline pipeline = (Pipeline) processor;
+            Iterator<Processor> it = pipeline.getProcessors().iterator();
+            processor = it.next();
         }
 
         // unwrap delegates as we want the real target processor
@@ -255,8 +264,12 @@ public class DefaultManagementObjectStrategy implements 
ManagementObjectStrategy
                 answer = new ManagedSetHeader(context, (SetHeaderProcessor) 
target, definition);
             } else if (target instanceof SetPropertyProcessor) {
                 answer = new ManagedSetProperty(context, 
(SetPropertyProcessor) target, definition);
+            } else if (target instanceof ThreadsProcessor) {
+                answer = new ManagedThreads(context, (ThreadsProcessor) 
target, definition);
             } else if (target instanceof TransformProcessor) {
                 answer = new ManagedTransformer(context, (TransformProcessor) 
target, definition);
+            } else if (target instanceof PredicateValidatingProcessor) {
+                answer = new ManagedValidate(context, 
(PredicateValidatingProcessor) target, definition);
             } else if (target instanceof WireTapProcessor) {
                 answer = new ManagedWireTapProcessor(context, 
(WireTapProcessor) target, definition);
             } else if (target instanceof SendDynamicProcessor) {

http://git-wip-us.apache.org/repos/asf/camel/blob/dfa458a2/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedFilter.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedFilter.java 
b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedFilter.java
index fc1afe8..edd89f2 100644
--- 
a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedFilter.java
+++ 
b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedFilter.java
@@ -41,7 +41,7 @@ public class ManagedFilter extends ManagedProcessor 
implements ManagedFilterMBea
     }
 
     @Override
-    public String getFilter() {
+    public String getPredicate() {
         return processor.getPredicate().toString();
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/dfa458a2/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThreads.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThreads.java
 
b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThreads.java
new file mode 100644
index 0000000..31c487e
--- /dev/null
+++ 
b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThreads.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.management.mbean;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.api.management.ManagedResource;
+import org.apache.camel.api.management.mbean.ManagedThreadsMBean;
+import org.apache.camel.model.ProcessorDefinition;
+import org.apache.camel.processor.ThreadsProcessor;
+
+/**
+ * @version 
+ */
+@ManagedResource(description = "Managed Threads")
+public class ManagedThreads extends ManagedProcessor implements 
ManagedThreadsMBean {
+    private final ThreadsProcessor processor;
+
+    public ManagedThreads(CamelContext context, ThreadsProcessor processor, 
ProcessorDefinition<?> definition) {
+        super(context, processor, definition);
+        this.processor = processor;
+    }
+
+    @Override
+    public Boolean isCallerRunsWhenRejected() {
+        return processor.isCallerRunsWhenRejected();
+    }
+
+    @Override
+    public String getRejectedPolicy() {
+        if (processor.getRejectedPolicy() != null) {
+            return processor.getRejectedPolicy().name();
+        } else {
+            return null;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/dfa458a2/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedValidate.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedValidate.java
 
b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedValidate.java
new file mode 100644
index 0000000..552444d
--- /dev/null
+++ 
b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedValidate.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.management.mbean;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.api.management.ManagedResource;
+import org.apache.camel.api.management.mbean.ManagedValidateMBean;
+import org.apache.camel.model.ProcessorDefinition;
+import org.apache.camel.processor.validation.PredicateValidatingProcessor;
+
+/**
+ * @version 
+ */
+@ManagedResource(description = "Managed Validate")
+public class ManagedValidate extends ManagedProcessor implements 
ManagedValidateMBean {
+    private final PredicateValidatingProcessor processor;
+
+    public ManagedValidate(CamelContext context, PredicateValidatingProcessor 
processor, ProcessorDefinition<?> definition) {
+        super(context, processor, definition);
+        this.processor = processor;
+    }
+
+    @Override
+    public String getPredicate() {
+        return processor.getPredicate().toString();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/dfa458a2/camel-core/src/test/java/org/apache/camel/management/ManagedFilterTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/management/ManagedFilterTest.java 
b/camel-core/src/test/java/org/apache/camel/management/ManagedFilterTest.java
index 2d66095..29082e6 100644
--- 
a/camel-core/src/test/java/org/apache/camel/management/ManagedFilterTest.java
+++ 
b/camel-core/src/test/java/org/apache/camel/management/ManagedFilterTest.java
@@ -61,7 +61,7 @@ public class ManagedFilterTest extends ManagementTestSupport {
         Long count = (Long) mbeanServer.getAttribute(on, "FilteredCount");
         assertEquals(1, count.longValue());
 
-        String uri = (String) mbeanServer.getAttribute(on, "Filter");
+        String uri = (String) mbeanServer.getAttribute(on, "Predicate");
         assertEquals("header{header(foo)}", uri);
 
         TabularData data = (TabularData) mbeanServer.invoke(on, "explain", new 
Object[]{false}, new String[]{"boolean"});

http://git-wip-us.apache.org/repos/asf/camel/blob/dfa458a2/camel-core/src/test/java/org/apache/camel/management/ManagedThreadsTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/management/ManagedThreadsTest.java 
b/camel-core/src/test/java/org/apache/camel/management/ManagedThreadsTest.java
new file mode 100644
index 0000000..096ecb1
--- /dev/null
+++ 
b/camel-core/src/test/java/org/apache/camel/management/ManagedThreadsTest.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.management;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import javax.management.openmbean.TabularData;
+
+import org.apache.camel.ServiceStatus;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * @version 
+ */
+public class ManagedThreadsTest extends ManagementTestSupport {
+
+    public void testManageThreads() throws Exception {
+        // JMX tests dont work well on AIX CI servers (hangs them)
+        if (isPlatform("aix")) {
+            return;
+        }
+
+        MockEndpoint foo = getMockEndpoint("mock:foo");
+        foo.expectedMessageCount(1);
+
+        template.sendBodyAndHeader("direct:start", "Hello World", "foo", 
"123");
+
+        assertMockEndpointsSatisfied();
+
+        // get the stats for the route
+        MBeanServer mbeanServer = getMBeanServer();
+
+        // get the object name for the delayer
+        ObjectName on = 
ObjectName.getInstance("org.apache.camel:context=camel-1,type=processors,name=\"mysend\"");
+
+        // should be on route1
+        String routeId = (String) mbeanServer.getAttribute(on, "RouteId");
+        assertEquals("route1", routeId);
+
+        String camelId = (String) mbeanServer.getAttribute(on, "CamelId");
+        assertEquals("camel-1", camelId);
+
+        String state = (String) mbeanServer.getAttribute(on, "State");
+        assertEquals(ServiceStatus.Started.name(), state);
+
+        TabularData data = (TabularData) mbeanServer.invoke(on, "explain", new 
Object[]{false}, new String[]{"boolean"});
+        assertNotNull(data);
+        assertEquals(4, data.size());
+
+        data = (TabularData) mbeanServer.invoke(on, "explain", new 
Object[]{true}, new String[]{"boolean"});
+        assertNotNull(data);
+        assertEquals(13, data.size());
+
+        String json = (String) mbeanServer.invoke(on, "informationJson", null, 
null);
+        assertNotNull(json);
+        assertTrue(json.contains("\"description\": \"Specifies that all steps 
after this node are processed asynchronously"));
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .threads(1, 2).id("mysend")
+                        .to("mock:foo");
+            }
+        };
+    }
+
+}

Reply via email to