Author: davsclaus Date: Mon Jan 30 17:22:16 2012 New Revision: 1237807 URL: http://svn.apache.org/viewvc?rev=1237807&view=rev Log: CAMEL-4938: Fixed seda endpoint with JMX not being able to invoke certain methods.
Removed: camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedSedaEndpoint.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java camel/trunk/camel-core/src/main/java/org/apache/camel/util/EndpointHelper.java camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsQueueEndpoint.java camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/BrowsableQueueTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java?rev=1237807&r1=1237806&r2=1237807&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java Mon Jan 30 17:22:16 2012 @@ -28,13 +28,19 @@ import java.util.concurrent.LinkedBlocki import org.apache.camel.Component; import org.apache.camel.Consumer; import org.apache.camel.Exchange; +import org.apache.camel.Message; import org.apache.camel.MultipleConsumersSupport; import org.apache.camel.Processor; import org.apache.camel.Producer; import org.apache.camel.WaitForTaskToComplete; +import org.apache.camel.api.management.ManagedAttribute; +import org.apache.camel.api.management.ManagedOperation; +import org.apache.camel.api.management.ManagedResource; import org.apache.camel.impl.DefaultEndpoint; import org.apache.camel.processor.MulticastProcessor; import org.apache.camel.spi.BrowsableEndpoint; +import org.apache.camel.util.EndpointHelper; +import org.apache.camel.util.MessageHelper; import org.apache.camel.util.ServiceHelper; /** @@ -42,6 +48,7 @@ import org.apache.camel.util.ServiceHelp * href="http://camel.apache.org/queue.html">Queue components</a> for * asynchronous SEDA exchanges on a {@link BlockingQueue} within a CamelContext */ +@ManagedResource(description = "Managed SedaEndpoint") public class SedaEndpoint extends DefaultEndpoint implements BrowsableEndpoint, MultipleConsumersSupport { private volatile BlockingQueue<Exchange> queue; private int size; @@ -133,6 +140,7 @@ public class SedaEndpoint extends Defaul this.size = queue.remainingCapacity(); } + @ManagedAttribute(description = "Queue max capacity") public int getSize() { return size; } @@ -141,6 +149,7 @@ public class SedaEndpoint extends Defaul this.size = size; } + @ManagedAttribute(description = "Current queue size") public int getCurrentQueueSize() { return queue.size(); } @@ -149,6 +158,7 @@ public class SedaEndpoint extends Defaul this.blockWhenFull = blockWhenFull; } + @ManagedAttribute(description = "Whether the caller will block sending to a full queue") public boolean isBlockWhenFull() { return blockWhenFull; } @@ -157,6 +167,7 @@ public class SedaEndpoint extends Defaul this.concurrentConsumers = concurrentConsumers; } + @ManagedAttribute(description = "Number of concurrent consumers") public int getConcurrentConsumers() { return concurrentConsumers; } @@ -169,6 +180,7 @@ public class SedaEndpoint extends Defaul this.waitForTaskToComplete = waitForTaskToComplete; } + @ManagedAttribute public long getTimeout() { return timeout; } @@ -177,6 +189,7 @@ public class SedaEndpoint extends Defaul this.timeout = timeout; } + @ManagedAttribute public boolean isMultipleConsumers() { return multipleConsumers; } @@ -196,6 +209,7 @@ public class SedaEndpoint extends Defaul return new ArrayList<Exchange>(getQueue()); } + @ManagedAttribute public boolean isMultipleConsumersSupported() { return isMultipleConsumers(); } @@ -203,6 +217,7 @@ public class SedaEndpoint extends Defaul /** * Purges the queue */ + @ManagedOperation(description = "Purges the seda queue") public void purgeQueue() { queue.clear(); } @@ -221,6 +236,74 @@ public class SedaEndpoint extends Defaul return new HashSet<SedaProducer>(producers); } + @ManagedOperation(description = "Current number of Exchanges in Queue") + public long queueSize() { + return getExchanges().size(); + } + + @ManagedOperation(description = "Get Exchange from queue by index") + public String browseExchange(Integer index) { + List<Exchange> exchanges = getExchanges(); + if (index >= exchanges.size()) { + return null; + } + Exchange exchange = exchanges.get(index); + if (exchange == null) { + return null; + } + // must use java type with JMX such as java.lang.String + return exchange.toString(); + } + + @ManagedOperation(description = "Get message body from queue by index") + public String browseMessageBody(Integer index) { + List<Exchange> exchanges = getExchanges(); + if (index >= exchanges.size()) { + return null; + } + Exchange exchange = exchanges.get(index); + if (exchange == null) { + return null; + } + + // must use java type with JMX such as java.lang.String + String body; + if (exchange.hasOut()) { + body = exchange.getOut().getBody(String.class); + } else { + body = exchange.getIn().getBody(String.class); + } + + return body; + } + + @ManagedOperation(description = "Get message as XML from queue by index") + public String browseMessageAsXml(Integer index, Boolean includeBody) { + List<Exchange> exchanges = getExchanges(); + if (index >= exchanges.size()) { + return null; + } + Exchange exchange = exchanges.get(index); + if (exchange == null) { + return null; + } + + Message msg = exchange.hasOut() ? exchange.getOut() : exchange.getIn(); + String xml = MessageHelper.dumpAsXml(msg, includeBody); + + return xml; + } + + @ManagedOperation(description = "Gets all the messages as XML from the queue") + public String browseAllMessagesAsXml(Boolean includeBody) { + return browseRangeMessagesAsXml(0, Integer.MAX_VALUE, includeBody); + } + + @ManagedOperation(description = "Gets the range of messages as XML from the queue") + public String browseRangeMessagesAsXml(Integer fromIndex, Integer toIndex, Boolean includeBody) { + return EndpointHelper.browseRangeMessagesAsXml(this, fromIndex, toIndex, includeBody); + } + void onStarted(SedaProducer producer) { producers.add(producer); } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java?rev=1237807&r1=1237806&r2=1237807&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java Mon Jan 30 17:22:16 2012 @@ -44,7 +44,6 @@ import org.apache.camel.management.mbean import org.apache.camel.management.mbean.ManagedProducer; import org.apache.camel.management.mbean.ManagedRoute; import org.apache.camel.management.mbean.ManagedScheduledPollConsumer; -import org.apache.camel.management.mbean.ManagedSedaEndpoint; import org.apache.camel.management.mbean.ManagedSendProcessor; import org.apache.camel.management.mbean.ManagedService; import org.apache.camel.management.mbean.ManagedSuspendableRoute; @@ -92,10 +91,6 @@ public class DefaultManagementObjectStra if (endpoint instanceof org.apache.camel.spi.ManagementAware) { return ((org.apache.camel.spi.ManagementAware<Endpoint>) endpoint).getManagedObject(endpoint); - } else if (endpoint instanceof SedaEndpoint) { - ManagedSedaEndpoint me = new ManagedSedaEndpoint((SedaEndpoint) endpoint); - me.init(context.getManagementStrategy()); - return me; } else if (endpoint instanceof BrowsableEndpoint) { ManagedBrowsableEndpoint me = new ManagedBrowsableEndpoint((BrowsableEndpoint) endpoint); me.init(context.getManagementStrategy()); Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/EndpointHelper.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/EndpointHelper.java?rev=1237807&r1=1237806&r2=1237807&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/util/EndpointHelper.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/EndpointHelper.java Mon Jan 30 17:22:16 2012 @@ -28,10 +28,12 @@ import java.util.regex.PatternSyntaxExce import org.apache.camel.CamelContext; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; +import org.apache.camel.Message; import org.apache.camel.PollingConsumer; import org.apache.camel.Processor; import org.apache.camel.ResolveEndpointFailedException; import org.apache.camel.Route; +import org.apache.camel.spi.BrowsableEndpoint; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -394,4 +396,44 @@ public final class EndpointHelper { // not found return null; } + + /** + * Browses the {@link BrowsableEndpoint} within the given range, and returns the messages as a XML payload. + * + * @param endpoint the browsable endpoint + * @param fromIndex from range + * @param toIndex to range + * @param includeBody whether to include the message body in the XML payload + * @return XML payload with the messages + * @throws IllegalArgumentException if the from and to range is invalid + * @see MessageHelper#dumpAsXml(org.apache.camel.Message) + */ + public static String browseRangeMessagesAsXml(BrowsableEndpoint endpoint, Integer fromIndex, Integer toIndex, Boolean includeBody) { + if (fromIndex == null) { + fromIndex = 0; + } + if (toIndex == null) { + toIndex = Integer.MAX_VALUE; + } + if (fromIndex > toIndex) { + throw new IllegalArgumentException("From index cannot be larger than to index, was: " + fromIndex + " > " + toIndex); + } + + List<Exchange> exchanges = endpoint.getExchanges(); + if (exchanges.size() == 0) { + return null; + } + + StringBuilder sb = new StringBuilder(); + sb.append("<messages>"); + for (int i = fromIndex; i < exchanges.size() && i <= toIndex; i++) { + Exchange exchange = exchanges.get(i); + Message msg = exchange.hasOut() ? exchange.getOut() : exchange.getIn(); + String xml = MessageHelper.dumpAsXml(msg, includeBody); + sb.append("\n").append(xml); + } + sb.append("\n</messages>"); + return sb.toString(); + } + } Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsQueueEndpoint.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsQueueEndpoint.java?rev=1237807&r1=1237806&r2=1237807&view=diff ============================================================================== --- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsQueueEndpoint.java (original) +++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsQueueEndpoint.java Mon Jan 30 17:22:16 2012 @@ -27,6 +27,7 @@ import org.apache.camel.api.management.M import org.apache.camel.api.management.ManagedOperation; import org.apache.camel.api.management.ManagedResource; import org.apache.camel.spi.BrowsableEndpoint; +import org.apache.camel.util.EndpointHelper; import org.apache.camel.util.MessageHelper; import org.springframework.jms.core.JmsOperations; @@ -162,31 +163,7 @@ public class JmsQueueEndpoint extends Jm @ManagedOperation(description = "Gets the range of messages as XML from the queue") public String browseRangeMessagesAsXml(Integer fromIndex, Integer toIndex, Boolean includeBody) { - if (fromIndex == null) { - fromIndex = 0; - } - if (toIndex == null) { - toIndex = Integer.MAX_VALUE; - } - if (fromIndex > toIndex) { - throw new IllegalArgumentException("From index cannot be larger than to index, was: " + fromIndex + " > " + toIndex); - } - - List<Exchange> exchanges = getExchanges(); - if (exchanges.size() == 0) { - return null; - } - - StringBuilder sb = new StringBuilder(); - sb.append("<messages>"); - for (int i = fromIndex; i < exchanges.size() && i <= toIndex; i++) { - Exchange exchange = exchanges.get(i); - Message msg = exchange.hasOut() ? exchange.getOut() : exchange.getIn(); - String xml = MessageHelper.dumpAsXml(msg, includeBody); - sb.append("\n").append(xml); - } - sb.append("\n</messages>"); - return sb.toString(); + return EndpointHelper.browseRangeMessagesAsXml(this, fromIndex, toIndex, includeBody); } protected QueueBrowseStrategy createQueueBrowseStrategy() { Modified: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/BrowsableQueueTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/BrowsableQueueTest.java?rev=1237807&r1=1237806&r2=1237807&view=diff ============================================================================== --- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/BrowsableQueueTest.java (original) +++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/BrowsableQueueTest.java Mon Jan 30 17:22:16 2012 @@ -62,6 +62,8 @@ public class BrowsableQueueTest extends Object expected = expectedBodies[++index]; assertEquals("Body: " + index, expected, actual); } + + Thread.sleep(99999999); } @Test @@ -115,6 +117,8 @@ public class BrowsableQueueTest extends } protected CamelContext createCamelContext() throws Exception { + enableJMX(); + CamelContext camelContext = super.createCamelContext(); ConnectionFactory connectionFactory = CamelJmsTestHelper.createConnectionFactory();