CAMEL-8965: Add mbean for wiretap
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/4376cb3d Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/4376cb3d Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/4376cb3d Branch: refs/heads/master Commit: 4376cb3db8380bf8d21a60ec3910763a85b1e74a Parents: b269ae5 Author: Claus Ibsen <davscl...@apache.org> Authored: Mon Jul 20 17:55:10 2015 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Mon Jul 20 22:54:08 2015 +0200 ---------------------------------------------------------------------- .../mbean/ManagedSendDynamicProcessorMBean.java | 2 +- .../management/mbean/ManagedWireTapMBean.java | 35 ++++++++ .../DefaultManagementObjectStrategy.java | 8 +- .../mbean/ManagedWireTapProcessor.java | 70 +++++++++++++++ .../apache/camel/model/EnrichDefinition.java | 4 +- .../camel/model/PollEnrichDefinition.java | 2 +- .../apache/camel/model/WireTapDefinition.java | 67 +++++++++++++++ .../camel/processor/WireTapProcessor.java | 23 +++++ .../camel/management/ManagedWireTapTest.java | 90 ++++++++++++++++++++ 9 files changed, 295 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/4376cb3d/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedSendDynamicProcessorMBean.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedSendDynamicProcessorMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedSendDynamicProcessorMBean.java index e1071ceb..9a6ae94 100644 --- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedSendDynamicProcessorMBean.java +++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedSendDynamicProcessorMBean.java @@ -26,7 +26,7 @@ public interface ManagedSendDynamicProcessorMBean extends ManagedProcessorMBean @ManagedAttribute(description = "Message Exchange Pattern") String getMessageExchangePattern(); - @ManagedAttribute(description = "Sets the maximum size used by the ProducerCacheN which is used to cache and reuse producers.") + @ManagedAttribute(description = "Sets the maximum size used by the ProducerCache which is used to cache and reuse producers") Integer getCacheSize(); @ManagedAttribute(description = "Ignore the invalidate endpoint exception when try to create a producer with that endpoint") http://git-wip-us.apache.org/repos/asf/camel/blob/4376cb3d/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedWireTapMBean.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedWireTapMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedWireTapMBean.java new file mode 100644 index 0000000..52663cc --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedWireTapMBean.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.api.management.mbean; + +import org.apache.camel.api.management.ManagedAttribute; + +public interface ManagedWireTapMBean extends ManagedProcessorMBean { + + @ManagedAttribute(description = "Expression that returns the uri to use for the wire tap destination", mask = true) + String getExpression(); + + @ManagedAttribute(description = "Sets the maximum size used by the ProducerCache which is used to cache and reuse producers") + Integer getCacheSize(); + + @ManagedAttribute(description = "Ignore the invalidate endpoint exception when try to create a producer with that endpoint") + Boolean isIgnoreInvalidEndpoint(); + + @ManagedAttribute(description = "Uses a copy of the original exchange") + Boolean isCopy(); + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/4376cb3d/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java b/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java index a8df7a9..d56f24c 100644 --- a/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java +++ b/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java @@ -38,7 +38,6 @@ import org.apache.camel.management.mbean.ManagedCamelContext; import org.apache.camel.management.mbean.ManagedComponent; import org.apache.camel.management.mbean.ManagedConsumer; import org.apache.camel.management.mbean.ManagedDelayer; -import org.apache.camel.management.mbean.ManagedSendDynamicProcessor; import org.apache.camel.management.mbean.ManagedEndpoint; import org.apache.camel.management.mbean.ManagedErrorHandler; import org.apache.camel.management.mbean.ManagedEventNotifier; @@ -47,20 +46,23 @@ import org.apache.camel.management.mbean.ManagedProcessor; import org.apache.camel.management.mbean.ManagedProducer; import org.apache.camel.management.mbean.ManagedRoute; import org.apache.camel.management.mbean.ManagedScheduledPollConsumer; +import org.apache.camel.management.mbean.ManagedSendDynamicProcessor; import org.apache.camel.management.mbean.ManagedSendProcessor; import org.apache.camel.management.mbean.ManagedService; import org.apache.camel.management.mbean.ManagedSuspendableRoute; import org.apache.camel.management.mbean.ManagedThreadPool; import org.apache.camel.management.mbean.ManagedThrottler; import org.apache.camel.management.mbean.ManagedThroughputLogger; +import org.apache.camel.management.mbean.ManagedWireTapProcessor; import org.apache.camel.model.ModelCamelContext; import org.apache.camel.model.ProcessorDefinition; import org.apache.camel.processor.Delayer; -import org.apache.camel.processor.SendDynamicProcessor; import org.apache.camel.processor.ErrorHandler; +import org.apache.camel.processor.SendDynamicProcessor; import org.apache.camel.processor.SendProcessor; import org.apache.camel.processor.Throttler; import org.apache.camel.processor.ThroughputLogger; +import org.apache.camel.processor.WireTapProcessor; import org.apache.camel.processor.aggregate.AggregateProcessor; import org.apache.camel.processor.idempotent.IdempotentConsumer; import org.apache.camel.spi.BrowsableEndpoint; @@ -183,6 +185,8 @@ public class DefaultManagementObjectStrategy implements ManagementObjectStrategy answer = new ManagedDelayer(context, (Delayer) target, definition); } else if (target instanceof Throttler) { answer = new ManagedThrottler(context, (Throttler) target, definition); + } else if (target instanceof WireTapProcessor) { + answer = new ManagedWireTapProcessor(context, (WireTapProcessor) target, definition); } else if (target instanceof SendDynamicProcessor) { answer = new ManagedSendDynamicProcessor(context, (SendDynamicProcessor) target, definition); } else if (target instanceof SendProcessor) { http://git-wip-us.apache.org/repos/asf/camel/blob/4376cb3d/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedWireTapProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedWireTapProcessor.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedWireTapProcessor.java new file mode 100644 index 0000000..60471fd --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedWireTapProcessor.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.management.mbean; + +import org.apache.camel.CamelContext; +import org.apache.camel.api.management.ManagedResource; +import org.apache.camel.api.management.mbean.ManagedWireTapMBean; +import org.apache.camel.model.ProcessorDefinition; +import org.apache.camel.processor.WireTapProcessor; +import org.apache.camel.spi.ManagementStrategy; +import org.apache.camel.util.URISupport; + +/** + * @version + */ +@ManagedResource(description = "Managed WireTapProcessor") +public class ManagedWireTapProcessor extends ManagedProcessor implements ManagedWireTapMBean { + private final WireTapProcessor processor; + private String uri; + + public ManagedWireTapProcessor(CamelContext context, WireTapProcessor processor, ProcessorDefinition<?> definition) { + super(context, processor, definition); + this.processor = processor; + } + + public void init(ManagementStrategy strategy) { + super.init(strategy); + boolean sanitize = strategy.getManagementAgent().getMask() != null ? strategy.getManagementAgent().getMask() : false; + if (sanitize) { + uri = URISupport.sanitizeUri(processor.getExpression().toString()); + } else { + uri = processor.getExpression().toString(); + } + } + + public WireTapProcessor getProcessor() { + return processor; + } + + public String getExpression() { + return uri; + } + + public Integer getCacheSize() { + return processor.getCacheSize(); + } + + public Boolean isIgnoreInvalidEndpoint() { + return processor.isIgnoreInvalidEndpoint(); + } + + public Boolean isCopy() { + return processor.isCopy(); + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/4376cb3d/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java b/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java index 1d05f60..6143f17 100644 --- a/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java @@ -182,8 +182,8 @@ public class EnrichDefinition extends NoOutputExpressionNode { } /** - * Sets the maximum size used by the {@link org.apache.camel.impl.ConsumerCache} which is used - * to cache and reuse consumers when using this pollEnrich, when uris are reused. + * Sets the maximum size used by the {@link org.apache.camel.impl.ProducerCache} which is used + * to cache and reuse producer when uris are reused. * * @param cacheSize the cache size, use <tt>0</tt> for default cache size, or <tt>-1</tt> to turn cache off. * @return the builder http://git-wip-us.apache.org/repos/asf/camel/blob/4376cb3d/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java b/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java index f200051..365cc81 100644 --- a/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java @@ -196,7 +196,7 @@ public class PollEnrichDefinition extends NoOutputExpressionNode { /** * Sets the maximum size used by the {@link org.apache.camel.impl.ConsumerCache} which is used - * to cache and reuse consumers when using this pollEnrich, when uris are reused. + * to cache and reuse consumers when uris are reused. * * @param cacheSize the cache size, use <tt>0</tt> for default cache size, or <tt>-1</tt> to turn cache off. * @return the builder http://git-wip-us.apache.org/repos/asf/camel/blob/4376cb3d/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java b/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java index 64e339e..22c4941 100644 --- a/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java @@ -30,6 +30,7 @@ import javax.xml.bind.annotation.XmlTransient; import org.apache.camel.ExchangePattern; import org.apache.camel.Expression; import org.apache.camel.Processor; +import org.apache.camel.model.language.ExpressionDefinition; import org.apache.camel.processor.CamelInternalProcessor; import org.apache.camel.processor.SendDynamicProcessor; import org.apache.camel.processor.WireTapProcessor; @@ -59,9 +60,13 @@ public class WireTapDefinition extends NoOutputExpressionNode implements Executo @XmlAttribute @Metadata(defaultValue = "true") private Boolean copy; @XmlAttribute + private Integer cacheSize; + @XmlAttribute private String onPrepareRef; @XmlTransient private Processor onPrepare; + @XmlAttribute + private Boolean ignoreInvalidEndpoint; public WireTapDefinition() { } @@ -75,6 +80,12 @@ public class WireTapDefinition extends NoOutputExpressionNode implements Executo // create the send dynamic producer to send to the wire tapped endpoint SendDynamicProcessor dynamicTo = new SendDynamicProcessor(getExpression()); dynamicTo.setCamelContext(routeContext.getCamelContext()); + if (cacheSize != null) { + dynamicTo.setCacheSize(cacheSize); + } + if (ignoreInvalidEndpoint != null) { + dynamicTo.setIgnoreInvalidEndpoint(ignoreInvalidEndpoint); + } // create error handler we need to use for processing the wire tapped Processor target = wrapInErrorHandler(routeContext, dynamicTo); @@ -109,6 +120,12 @@ public class WireTapDefinition extends NoOutputExpressionNode implements Executo if (onPrepare != null) { answer.setOnPrepare(onPrepare); } + if (cacheSize != null) { + answer.setCacheSize(cacheSize); + } + if (ignoreInvalidEndpoint != null) { + answer.setIgnoreInvalidEndpoint(ignoreInvalidEndpoint); + } return answer; } @@ -261,6 +278,40 @@ public class WireTapDefinition extends NoOutputExpressionNode implements Executo return this; } + /** + * Sets the maximum size used by the {@link org.apache.camel.impl.ProducerCache} which is used + * to cache and reuse producers, when uris are reused. + * + * @param cacheSize the cache size, use <tt>0</tt> for default cache size, or <tt>-1</tt> to turn cache off. + * @return the builder + */ + public WireTapDefinition cacheSize(int cacheSize) { + setCacheSize(cacheSize); + return this; + } + + /** + * Ignore the invalidate endpoint exception when try to create a producer with that endpoint + * + * @return the builder + */ + public WireTapDefinition ignoreInvalidEndpoint() { + setIgnoreInvalidEndpoint(true); + return this; + } + + // Properties + //------------------------------------------------------------------------- + + /** + * Expression that returns the uri to use for the wire tap destination + */ + @Override + public void setExpression(ExpressionDefinition expression) { + // override to include javadoc what the expression is used for + super.setExpression(expression); + } + public Processor getNewExchangeProcessor() { return newExchangeProcessor; } @@ -345,4 +396,20 @@ public class WireTapDefinition extends NoOutputExpressionNode implements Executo public void setHeaders(List<SetHeaderDefinition> headers) { this.headers = headers; } + + public Integer getCacheSize() { + return cacheSize; + } + + public void setCacheSize(Integer cacheSize) { + this.cacheSize = cacheSize; + } + + public Boolean getIgnoreInvalidEndpoint() { + return ignoreInvalidEndpoint; + } + + public void setIgnoreInvalidEndpoint(Boolean ignoreInvalidEndpoint) { + this.ignoreInvalidEndpoint = ignoreInvalidEndpoint; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/4376cb3d/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java index 87a3365..fc09ea8 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java @@ -57,6 +57,9 @@ public class WireTapProcessor extends ServiceSupport implements AsyncProcessor, private final ExchangePattern exchangePattern; private final ExecutorService executorService; private volatile boolean shutdownExecutorService; + // only used for management to be able to report the setting + private int cacheSize; + private boolean ignoreInvalidEndpoint; // expression or processor used for populating a new exchange to send // as opposed to traditional wiretap that sends a copy of the original exchange @@ -204,6 +207,10 @@ public class WireTapProcessor extends ServiceSupport implements AsyncProcessor, return new DefaultExchange(exchange.getFromEndpoint(), ExchangePattern.InOnly); } + public Expression getExpression() { + return expression; + } + public List<Processor> getNewExchangeProcessors() { return newExchangeProcessors; } @@ -243,6 +250,22 @@ public class WireTapProcessor extends ServiceSupport implements AsyncProcessor, this.onPrepare = onPrepare; } + public int getCacheSize() { + return cacheSize; + } + + public void setCacheSize(int cacheSize) { + this.cacheSize = cacheSize; + } + + public boolean isIgnoreInvalidEndpoint() { + return ignoreInvalidEndpoint; + } + + public void setIgnoreInvalidEndpoint(boolean ignoreInvalidEndpoint) { + this.ignoreInvalidEndpoint = ignoreInvalidEndpoint; + } + @Override protected void doStart() throws Exception { ServiceHelper.startService(processor); http://git-wip-us.apache.org/repos/asf/camel/blob/4376cb3d/camel-core/src/test/java/org/apache/camel/management/ManagedWireTapTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/management/ManagedWireTapTest.java b/camel-core/src/test/java/org/apache/camel/management/ManagedWireTapTest.java new file mode 100644 index 0000000..31e0a37 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/management/ManagedWireTapTest.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.management; + +import javax.management.MBeanServer; +import javax.management.ObjectName; +import javax.management.openmbean.TabularData; + +import org.apache.camel.ServiceStatus; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; + +/** + * @version + */ +public class ManagedWireTapTest extends ManagementTestSupport { + + public void testManageWireTap() throws Exception { + // JMX tests dont work well on AIX CI servers (hangs them) + if (isPlatform("aix")) { + return; + } + + MockEndpoint foo = getMockEndpoint("mock:foo"); + foo.expectedMessageCount(1); + + template.sendBodyAndHeader("direct:start", "Hello World", "whereto", "foo"); + + assertMockEndpointsSatisfied(); + + // get the stats for the route + MBeanServer mbeanServer = getMBeanServer(); + + // get the object name for the delayer + ObjectName on = ObjectName.getInstance("org.apache.camel:context=camel-1,type=processors,name=\"mysend\""); + + // should be on route1 + String routeId = (String) mbeanServer.getAttribute(on, "RouteId"); + assertEquals("route1", routeId); + + String camelId = (String) mbeanServer.getAttribute(on, "CamelId"); + assertEquals("camel-1", camelId); + + String state = (String) mbeanServer.getAttribute(on, "State"); + assertEquals(ServiceStatus.Started.name(), state); + + String uri = (String) mbeanServer.getAttribute(on, "Expression"); + assertEquals("simple{direct:${header.whereto}}", uri); + + TabularData data = (TabularData) mbeanServer.invoke(on, "explain", new Object[]{false}, new String[]{"boolean"}); + assertNotNull(data); + assertEquals(2, data.size()); + + data = (TabularData) mbeanServer.invoke(on, "explain", new Object[]{true}, new String[]{"boolean"}); + assertNotNull(data); + assertEquals(10, data.size()); + + String json = (String) mbeanServer.invoke(on, "informationJson", null, null); + assertNotNull(json); + assertTrue(json.contains("\"description\": \"Routes a copy of a message (or creates a new message) to a secondary destination while continue routing the original message")); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start") + .wireTap("direct:${header.whereto}").id("mysend"); + + from("direct:foo").to("mock:foo"); + } + }; + } + +}