Repository: camel Updated Branches: refs/heads/master 2ac36c0c8 -> 0092f2cb4
CAMEL-8986: Add specialized mbean for enrich and pollEnrich Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/0092f2cb Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/0092f2cb Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/0092f2cb Branch: refs/heads/master Commit: 0092f2cb43903227785abb6e61d8fd0ef1e30a54 Parents: 2ac36c0 Author: Claus Ibsen <davscl...@apache.org> Authored: Tue Jul 21 12:33:23 2015 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Tue Jul 21 12:33:23 2015 +0200 ---------------------------------------------------------------------- .../management/mbean/ManagedEnricherMBean.java | 38 ++++++++ .../mbean/ManagedPollEnricherMBean.java | 38 ++++++++ .../DefaultManagementObjectStrategy.java | 8 ++ .../camel/management/mbean/ManagedEnricher.java | 78 ++++++++++++++++ .../management/mbean/ManagedPollEnricher.java | 78 ++++++++++++++++ .../org/apache/camel/processor/Enricher.java | 4 + .../apache/camel/processor/PollEnricher.java | 4 + .../camel/management/ManagedEnricherTest.java | 90 +++++++++++++++++++ .../management/ManagedPollEnricherTest.java | 93 ++++++++++++++++++++ 9 files changed, 431 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/0092f2cb/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedEnricherMBean.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedEnricherMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedEnricherMBean.java new file mode 100644 index 0000000..a20447c --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedEnricherMBean.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.api.management.mbean; + +import org.apache.camel.api.management.ManagedAttribute; + +public interface ManagedEnricherMBean extends ManagedProcessorMBean { + + @ManagedAttribute(description = "Expression that computes the endpoint uri to use as the resource endpoint to enrich from", mask = true) + String getExpression(); + + @ManagedAttribute(description = "Sets the maximum size used by the ProducerCache which is used to cache and reuse producers") + Integer getCacheSize(); + + @ManagedAttribute(description = "Ignore the invalidate endpoint exception when try to create a producer with that endpoint") + Boolean isIgnoreInvalidEndpoint(); + + @ManagedAttribute(description = "Shares the UnitOfWork with the parent and the resource exchange") + Boolean isShareUnitOfWork(); + + @ManagedAttribute(description = "Whether to aggregate when there was an exception thrown during calling the resource endpoint") + Boolean isAggregateOnException(); + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/0092f2cb/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedPollEnricherMBean.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedPollEnricherMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedPollEnricherMBean.java new file mode 100644 index 0000000..830225b --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedPollEnricherMBean.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.api.management.mbean; + +import org.apache.camel.api.management.ManagedAttribute; + +public interface ManagedPollEnricherMBean extends ManagedProcessorMBean { + + @ManagedAttribute(description = "Expression that computes the endpoint uri to use as the resource endpoint to poll enrich from", mask = true) + String getExpression(); + + @ManagedAttribute(description = "Timeout in millis when polling from the external service") + Long getTimeout(); + + @ManagedAttribute(description = "Sets the maximum size used by the ConsumerCache which is used to cache and reuse consumers") + Integer getCacheSize(); + + @ManagedAttribute(description = "Ignore the invalidate endpoint exception when try to create a consumer with that endpoint") + Boolean isIgnoreInvalidEndpoint(); + + @ManagedAttribute(description = "Whether to aggregate when there was an exception thrown during calling the resource endpoint") + Boolean isAggregateOnException(); + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/0092f2cb/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java b/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java index d56f24c..705fc2b 100644 --- a/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java +++ b/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java @@ -39,9 +39,11 @@ import org.apache.camel.management.mbean.ManagedComponent; import org.apache.camel.management.mbean.ManagedConsumer; import org.apache.camel.management.mbean.ManagedDelayer; import org.apache.camel.management.mbean.ManagedEndpoint; +import org.apache.camel.management.mbean.ManagedEnricher; import org.apache.camel.management.mbean.ManagedErrorHandler; import org.apache.camel.management.mbean.ManagedEventNotifier; import org.apache.camel.management.mbean.ManagedIdempotentConsumer; +import org.apache.camel.management.mbean.ManagedPollEnricher; import org.apache.camel.management.mbean.ManagedProcessor; import org.apache.camel.management.mbean.ManagedProducer; import org.apache.camel.management.mbean.ManagedRoute; @@ -57,7 +59,9 @@ import org.apache.camel.management.mbean.ManagedWireTapProcessor; import org.apache.camel.model.ModelCamelContext; import org.apache.camel.model.ProcessorDefinition; import org.apache.camel.processor.Delayer; +import org.apache.camel.processor.Enricher; import org.apache.camel.processor.ErrorHandler; +import org.apache.camel.processor.PollEnricher; import org.apache.camel.processor.SendDynamicProcessor; import org.apache.camel.processor.SendProcessor; import org.apache.camel.processor.Throttler; @@ -209,6 +213,10 @@ public class DefaultManagementObjectStrategy implements ManagementObjectStrategy answer = new ManagedIdempotentConsumer(context, (IdempotentConsumer) target, definition); } else if (target instanceof AggregateProcessor) { answer = new ManagedAggregateProcessor(context, (AggregateProcessor) target, (org.apache.camel.model.AggregateDefinition) definition); + } else if (target instanceof Enricher) { + answer = new ManagedEnricher(context, (Enricher) target, definition); + } else if (target instanceof PollEnricher) { + answer = new ManagedPollEnricher(context, (PollEnricher) target, definition); } else if (target instanceof org.apache.camel.spi.ManagementAware) { return ((org.apache.camel.spi.ManagementAware<Processor>) target).getManagedObject(processor); } http://git-wip-us.apache.org/repos/asf/camel/blob/0092f2cb/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedEnricher.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedEnricher.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedEnricher.java new file mode 100644 index 0000000..de01f56 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedEnricher.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.management.mbean; + +import org.apache.camel.CamelContext; +import org.apache.camel.api.management.ManagedResource; +import org.apache.camel.api.management.mbean.ManagedEnricherMBean; +import org.apache.camel.model.ProcessorDefinition; +import org.apache.camel.processor.Enricher; +import org.apache.camel.spi.ManagementStrategy; +import org.apache.camel.util.URISupport; + +/** + * @version + */ +@ManagedResource(description = "Managed Enricher") +public class ManagedEnricher extends ManagedProcessor implements ManagedEnricherMBean { + private final Enricher processor; + private String uri; + + public ManagedEnricher(CamelContext context, Enricher processor, ProcessorDefinition<?> definition) { + super(context, processor, definition); + this.processor = processor; + } + + public void init(ManagementStrategy strategy) { + super.init(strategy); + boolean sanitize = strategy.getManagementAgent().getMask() != null ? strategy.getManagementAgent().getMask() : false; + uri = processor.getExpression().toString(); + if (sanitize) { + uri = URISupport.sanitizeUri(uri); + } + } + + @Override + public Enricher getProcessor() { + return processor; + } + + @Override + public String getExpression() { + return uri; + } + + @Override + public Integer getCacheSize() { + return processor.getCacheSize(); + } + + @Override + public Boolean isIgnoreInvalidEndpoint() { + return processor.isIgnoreInvalidEndpoint(); + } + + @Override + public Boolean isShareUnitOfWork() { + return processor.isShareUnitOfWork(); + } + + @Override + public Boolean isAggregateOnException() { + return processor.isAggregateOnException(); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/0092f2cb/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedPollEnricher.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedPollEnricher.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedPollEnricher.java new file mode 100644 index 0000000..8519219 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedPollEnricher.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.management.mbean; + +import org.apache.camel.CamelContext; +import org.apache.camel.api.management.ManagedResource; +import org.apache.camel.api.management.mbean.ManagedPollEnricherMBean; +import org.apache.camel.model.ProcessorDefinition; +import org.apache.camel.processor.PollEnricher; +import org.apache.camel.spi.ManagementStrategy; +import org.apache.camel.util.URISupport; + +/** + * @version + */ +@ManagedResource(description = "Managed PollEnricher") +public class ManagedPollEnricher extends ManagedProcessor implements ManagedPollEnricherMBean { + private final PollEnricher processor; + private String uri; + + public ManagedPollEnricher(CamelContext context, PollEnricher processor, ProcessorDefinition<?> definition) { + super(context, processor, definition); + this.processor = processor; + } + + public void init(ManagementStrategy strategy) { + super.init(strategy); + boolean sanitize = strategy.getManagementAgent().getMask() != null ? strategy.getManagementAgent().getMask() : false; + uri = processor.getExpression().toString(); + if (sanitize) { + uri = URISupport.sanitizeUri(uri); + } + } + + @Override + public PollEnricher getProcessor() { + return processor; + } + + @Override + public String getExpression() { + return uri; + } + + @Override + public Long getTimeout() { + return processor.getTimeout(); + } + + @Override + public Integer getCacheSize() { + return processor.getCacheSize(); + } + + @Override + public Boolean isIgnoreInvalidEndpoint() { + return processor.isIgnoreInvalidEndpoint(); + } + + @Override + public Boolean isAggregateOnException() { + return processor.isAggregateOnException(); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/0092f2cb/camel-core/src/main/java/org/apache/camel/processor/Enricher.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/Enricher.java b/camel-core/src/main/java/org/apache/camel/processor/Enricher.java index 06e0b8f..99a81ba 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/Enricher.java +++ b/camel-core/src/main/java/org/apache/camel/processor/Enricher.java @@ -88,6 +88,10 @@ public class Enricher extends ServiceSupport implements AsyncProcessor, IdAware, this.id = id; } + public Expression getExpression() { + return expression; + } + public void setAggregationStrategy(AggregationStrategy aggregationStrategy) { this.aggregationStrategy = aggregationStrategy; } http://git-wip-us.apache.org/repos/asf/camel/blob/0092f2cb/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java b/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java index bb52bff..998926b 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java +++ b/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java @@ -90,6 +90,10 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor, IdAw this.id = id; } + public Expression getExpression() { + return expression; + } + public AggregationStrategy getAggregationStrategy() { return aggregationStrategy; } http://git-wip-us.apache.org/repos/asf/camel/blob/0092f2cb/camel-core/src/test/java/org/apache/camel/management/ManagedEnricherTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/management/ManagedEnricherTest.java b/camel-core/src/test/java/org/apache/camel/management/ManagedEnricherTest.java new file mode 100644 index 0000000..52c68bc --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/management/ManagedEnricherTest.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.management; + +import javax.management.MBeanServer; +import javax.management.ObjectName; +import javax.management.openmbean.TabularData; + +import org.apache.camel.ServiceStatus; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; + +/** + * @version + */ +public class ManagedEnricherTest extends ManagementTestSupport { + + public void testManageEnricher() throws Exception { + // JMX tests dont work well on AIX CI servers (hangs them) + if (isPlatform("aix")) { + return; + } + + MockEndpoint foo = getMockEndpoint("mock:foo"); + foo.expectedMessageCount(1); + + template.sendBodyAndHeader("direct:start", "Hello World", "whereto", "foo"); + + assertMockEndpointsSatisfied(); + + // get the stats for the route + MBeanServer mbeanServer = getMBeanServer(); + + // get the object name for the delayer + ObjectName on = ObjectName.getInstance("org.apache.camel:context=camel-1,type=processors,name=\"mysend\""); + + // should be on route1 + String routeId = (String) mbeanServer.getAttribute(on, "RouteId"); + assertEquals("route1", routeId); + + String camelId = (String) mbeanServer.getAttribute(on, "CamelId"); + assertEquals("camel-1", camelId); + + String state = (String) mbeanServer.getAttribute(on, "State"); + assertEquals(ServiceStatus.Started.name(), state); + + String uri = (String) mbeanServer.getAttribute(on, "Expression"); + assertEquals("Simple: direct:${header.whereto}", uri); + + TabularData data = (TabularData) mbeanServer.invoke(on, "explain", new Object[]{false}, new String[]{"boolean"}); + assertNotNull(data); + assertEquals(2, data.size()); + + data = (TabularData) mbeanServer.invoke(on, "explain", new Object[]{true}, new String[]{"boolean"}); + assertNotNull(data); + assertEquals(10, data.size()); + + String json = (String) mbeanServer.invoke(on, "informationJson", null, null); + assertNotNull(json); + assertTrue(json.contains("\"description\": \"Enriches a message with data from a secondary resource")); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start") + .enrich().simple("direct:${header.whereto}").id("mysend"); + + from("direct:foo").to("mock:foo"); + } + }; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/0092f2cb/camel-core/src/test/java/org/apache/camel/management/ManagedPollEnricherTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/management/ManagedPollEnricherTest.java b/camel-core/src/test/java/org/apache/camel/management/ManagedPollEnricherTest.java new file mode 100644 index 0000000..f679941 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/management/ManagedPollEnricherTest.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.management; + +import javax.management.MBeanServer; +import javax.management.ObjectName; +import javax.management.openmbean.TabularData; + +import org.apache.camel.ServiceStatus; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; + +/** + * @version + */ +public class ManagedPollEnricherTest extends ManagementTestSupport { + + public void testManagePollEnricher() throws Exception { + // JMX tests dont work well on AIX CI servers (hangs them) + if (isPlatform("aix")) { + return; + } + + MockEndpoint foo = getMockEndpoint("mock:foo"); + foo.expectedBodiesReceived("Hello World"); + + template.sendBody("seda:foo", "Hello World"); + template.sendBodyAndHeader("direct:start", "Hello", "whereto", "foo"); + + assertMockEndpointsSatisfied(); + + // get the stats for the route + MBeanServer mbeanServer = getMBeanServer(); + + // get the object name for the delayer + ObjectName on = ObjectName.getInstance("org.apache.camel:context=camel-1,type=processors,name=\"mysend\""); + + // should be on route1 + String routeId = (String) mbeanServer.getAttribute(on, "RouteId"); + assertEquals("route1", routeId); + + String camelId = (String) mbeanServer.getAttribute(on, "CamelId"); + assertEquals("camel-1", camelId); + + String state = (String) mbeanServer.getAttribute(on, "State"); + assertEquals(ServiceStatus.Started.name(), state); + + Long timout = (Long) mbeanServer.getAttribute(on, "Timeout"); + assertEquals(1000, timout.longValue()); + + String uri = (String) mbeanServer.getAttribute(on, "Expression"); + assertEquals("Simple: seda:${header.whereto}", uri); + + TabularData data = (TabularData) mbeanServer.invoke(on, "explain", new Object[]{false}, new String[]{"boolean"}); + assertNotNull(data); + assertEquals(3, data.size()); + + data = (TabularData) mbeanServer.invoke(on, "explain", new Object[]{true}, new String[]{"boolean"}); + assertNotNull(data); + assertEquals(10, data.size()); + + String json = (String) mbeanServer.invoke(on, "informationJson", null, null); + assertNotNull(json); + assertTrue(json.contains("\"description\": \"Enriches messages with data polled from a secondary resource")); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start") + .pollEnrich().simple("seda:${header.whereto}").timeout(1000).id("mysend") + .to("mock:foo"); + } + }; + } + +}