Author: davsclaus Date: Wed Sep 2 07:52:47 2009 New Revision: 810399 URL: http://svn.apache.org/viewvc?rev=810399&view=rev Log: CAMEL-1976: restarting jms consumers will pickup jms endpoint changes such as managed using JMX. CAMEL-1933: Overhaul of JMX. Management of JmsEndpoint now possible. Exposed concurrent consumers but the other options to follow.
Added: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsConsumerRestartPickupConfigurationChangesTest.java (with props) Modified: camel/trunk/components/camel-jms/pom.xml camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsQueueEndpoint.java camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsEndpointConfigurationTest.java Modified: camel/trunk/components/camel-jms/pom.xml URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/pom.xml?rev=810399&r1=810398&r2=810399&view=diff ============================================================================== --- camel/trunk/components/camel-jms/pom.xml (original) +++ camel/trunk/components/camel-jms/pom.xml Wed Sep 2 07:52:47 2009 @@ -49,6 +49,10 @@ <artifactId>spring-jms</artifactId> </dependency> <dependency> + <groupId>org.springframework</groupId> + <artifactId>spring-context</artifactId> + </dependency> + <dependency> <groupId>org.apache.geronimo.specs</groupId> <artifactId>geronimo-jms_1.1_spec</artifactId> </dependency> Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java?rev=810399&r1=810398&r2=810399&view=diff ============================================================================== --- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java (original) +++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java Wed Sep 2 07:52:47 2009 @@ -27,33 +27,53 @@ * @version $Revision$ */ public class JmsConsumer extends DefaultConsumer { - private final AbstractMessageListenerContainer listenerContainer; + private AbstractMessageListenerContainer listenerContainer; private EndpointMessageListener messageListener; public JmsConsumer(JmsEndpoint endpoint, Processor processor, AbstractMessageListenerContainer listenerContainer) { super(endpoint, processor); this.listenerContainer = listenerContainer; + this.listenerContainer.setMessageListener(getEndpointMessageListener()); + } - createMessageListener(endpoint, processor); - this.listenerContainer.setMessageListener(messageListener); + public JmsEndpoint getEndpoint() { + return (JmsEndpoint) super.getEndpoint(); } public AbstractMessageListenerContainer getListenerContainer() { + if (listenerContainer == null) { + createMessageListenerContainer(); + } return listenerContainer; } public EndpointMessageListener getEndpointMessageListener() { + if (messageListener == null) { + createMessageListener(getEndpoint(), getProcessor()); + } return messageListener; } - + protected void createMessageListener(JmsEndpoint endpoint, Processor processor) { messageListener = new EndpointMessageListener(endpoint, processor); messageListener.setBinding(endpoint.getBinding()); } + protected void createMessageListenerContainer() { + listenerContainer = getEndpoint().createMessageListenerContainer(); + getEndpoint().configureListenerContainer(listenerContainer); + listenerContainer.setMessageListener(getEndpointMessageListener()); + } + @Override protected void doStart() throws Exception { super.doStart(); + + // create listener container + if (listenerContainer == null) { + createMessageListenerContainer(); + } + listenerContainer.afterPropertiesSet(); listenerContainer.start(); } @@ -62,6 +82,11 @@ protected void doStop() throws Exception { listenerContainer.stop(); listenerContainer.destroy(); + + // null container and listener so they are fully re created if this consumer is restarted + // then we will use updated configuration from jms endpoint that may have been managed using JMX + listenerContainer = null; + messageListener = null; super.doStop(); } } Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java?rev=810399&r1=810398&r2=810399&view=diff ============================================================================== --- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java (original) +++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java Wed Sep 2 07:52:47 2009 @@ -36,12 +36,15 @@ import org.apache.camel.impl.DefaultExchange; import org.apache.camel.spi.HeaderFilterStrategy; import org.apache.camel.spi.HeaderFilterStrategyAware; +import org.apache.camel.spi.ManagementAware; import org.springframework.core.task.TaskExecutor; import org.springframework.jms.core.JmsOperations; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.listener.AbstractMessageListenerContainer; import org.springframework.jms.support.converter.MessageConverter; import org.springframework.jms.support.destination.DestinationResolver; +import org.springframework.jmx.export.annotation.ManagedAttribute; +import org.springframework.jmx.export.annotation.ManagedResource; import org.springframework.transaction.PlatformTransactionManager; /** @@ -49,7 +52,8 @@ * * @version $Revision:520964 $ */ -public class JmsEndpoint extends DefaultEndpoint implements HeaderFilterStrategyAware { +...@managedresource(description = "Managed JMS Endpoint") +public class JmsEndpoint extends DefaultEndpoint implements HeaderFilterStrategyAware, ManagementAware<JmsEndpoint> { private HeaderFilterStrategy headerFilterStrategy; private boolean pubSubDomain; private JmsBinding binding; @@ -158,21 +162,16 @@ return answer; } - public JmsConsumer createConsumer(Processor processor) throws Exception { AbstractMessageListenerContainer listenerContainer = configuration.createMessageListenerContainer(this); return createConsumer(processor, listenerContainer); } - /** - * Creates a consumer using the given processor and listener container - * - * @param processor the processor to use to process the messages - * @param listenerContainer the listener container - * @return a newly created consumer - * @throws Exception if the consumer cannot be created - */ - public JmsConsumer createConsumer(Processor processor, AbstractMessageListenerContainer listenerContainer) throws Exception { + public AbstractMessageListenerContainer createMessageListenerContainer() { + return configuration.createMessageListenerContainer(this); + } + + public void configureListenerContainer(AbstractMessageListenerContainer listenerContainer) { if (destinationName != null) { listenerContainer.setDestinationName(destinationName); } else if (destination != null) { @@ -186,6 +185,18 @@ } } listenerContainer.setPubSubDomain(pubSubDomain); + } + + /** + * Creates a consumer using the given processor and listener container + * + * @param processor the processor to use to process the messages + * @param listenerContainer the listener container + * @return a newly created consumer + * @throws Exception if the consumer cannot be created + */ + public JmsConsumer createConsumer(Processor processor, AbstractMessageListenerContainer listenerContainer) throws Exception { + configureListenerContainer(listenerContainer); return new JmsConsumer(this, processor, listenerContainer); } @@ -222,6 +233,10 @@ return configuration.createInOutTemplate(this, pubSubDomain, destinationName, configuration.getRequestTimeout()); } + public Object getManagedObject(JmsEndpoint endpoint) { + return this; + } + // Properties // ------------------------------------------------------------------------- public HeaderFilterStrategy getHeaderFilterStrategy() { @@ -384,6 +399,7 @@ return getConfiguration().getClientId(); } + @ManagedAttribute public int getConcurrentConsumers() { return getConfiguration().getConcurrentConsumers(); } @@ -596,6 +612,7 @@ getConfiguration().setClientId(consumerClientId); } + @ManagedAttribute public void setConcurrentConsumers(int concurrentConsumers) { getConfiguration().setConcurrentConsumers(concurrentConsumers); } @@ -616,7 +633,6 @@ getConfiguration().setDestinationResolver(destinationResolver); } - public void setDisableReplyTo(boolean disableReplyTo) { getConfiguration().setDisableReplyTo(disableReplyTo); } @@ -801,6 +817,17 @@ getConfiguration().setTransferException(transferException); } + @ManagedAttribute(description = "Camel id") + public String getCamelId() { + return getCamelContext().getName(); + } + + @ManagedAttribute(description = "Endpoint Uri") + @Override + public String getEndpointUri() { + return super.getEndpointUri(); + } + // Implementation methods //------------------------------------------------------------------------- Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsQueueEndpoint.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsQueueEndpoint.java?rev=810399&r1=810398&r2=810399&view=diff ============================================================================== --- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsQueueEndpoint.java (original) +++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsQueueEndpoint.java Wed Sep 2 07:52:47 2009 @@ -26,12 +26,16 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.jms.core.JmsOperations; +import org.springframework.jmx.export.annotation.ManagedAttribute; +import org.springframework.jmx.export.annotation.ManagedOperation; +import org.springframework.jmx.export.annotation.ManagedResource; /** * An endpoint for a JMS Queue which is also browsable * * @version $Revision$ */ +...@managedresource(description = "Managed JMS Queue Endpoint") public class JmsQueueEndpoint extends JmsEndpoint implements BrowsableEndpoint { private static final transient Log LOG = LogFactory.getLog(JmsQueueEndpoint.class); @@ -72,6 +76,7 @@ queueBrowseStrategy = createQueueBrowseStrategy(); } + @ManagedAttribute public int getMaximumBrowseSize() { return maximumBrowseSize; } @@ -80,6 +85,7 @@ * If a number is set > 0 then this limits the number of messages that are * returned when browsing the queue */ + @ManagedAttribute public void setMaximumBrowseSize(int maximumBrowseSize) { this.maximumBrowseSize = maximumBrowseSize; } @@ -93,6 +99,17 @@ return queueBrowseStrategy.browse(template, queue, this); } + @ManagedOperation(description = "Current number of Exchanges in Queue") + public long qeueSize() { + return getExchanges().size(); + } + + @ManagedOperation(description = "Get Exchange from queue by index") + public Exchange browseExchange(Integer index) { + return getExchanges().get(index); + } + + protected QueueBrowseStrategy createQueueBrowseStrategy() { QueueBrowseStrategy answer = null; try { Added: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsConsumerRestartPickupConfigurationChangesTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsConsumerRestartPickupConfigurationChangesTest.java?rev=810399&view=auto ============================================================================== --- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsConsumerRestartPickupConfigurationChangesTest.java (added) +++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsConsumerRestartPickupConfigurationChangesTest.java Wed Sep 2 07:52:47 2009 @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.jms; + +import javax.jms.ConnectionFactory; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.camel.CamelContext; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; +import static org.apache.camel.component.jms.JmsComponent.jmsComponentClientAcknowledge; + +/** + * @version $Revision$ + */ +public class JmsConsumerRestartPickupConfigurationChangesTest extends CamelTestSupport { + + @Test + public void testRestartJmsConsumerPickupChanges() throws Exception { + JmsEndpoint endpoint = context.getEndpoint("activemq:queue:foo", JmsEndpoint.class); + JmsConsumer consumer = endpoint.createConsumer(new Processor() { + public void process(Exchange exchange) throws Exception { + template.send("mock:result", exchange); + } + }); + + consumer.start(); + + MockEndpoint result = getMockEndpoint("mock:result"); + result.expectedBodiesReceived("Hello World"); + template.sendBody("activemq:queue:foo", "Hello World"); + assertMockEndpointsSatisfied(); + + consumer.stop(); + + // change to listen on another queue + endpoint.setDestinationName("bar"); + endpoint.setConcurrentConsumers(2); + + // restart it + consumer.start(); + + result.reset(); + result.expectedBodiesReceived("Bye World"); + template.sendBody("activemq:queue:bar", "Bye World"); + assertMockEndpointsSatisfied(); + + consumer.stop(); + } + + protected CamelContext createCamelContext() throws Exception { + CamelContext camelContext = super.createCamelContext(); + + ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"); + camelContext.addComponent("activemq", jmsComponentClientAcknowledge(connectionFactory)); + + return camelContext; + } + +} Propchange: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsConsumerRestartPickupConfigurationChangesTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsConsumerRestartPickupConfigurationChangesTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Modified: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsEndpointConfigurationTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsEndpointConfigurationTest.java?rev=810399&r1=810398&r2=810399&view=diff ============================================================================== --- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsEndpointConfigurationTest.java (original) +++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsEndpointConfigurationTest.java Wed Sep 2 07:52:47 2009 @@ -16,7 +16,6 @@ */ package org.apache.camel.component.jms; - import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; @@ -35,8 +34,6 @@ import org.springframework.jms.listener.DefaultMessageListenerContainer; import static org.apache.camel.component.jms.JmsComponent.jmsComponentClientAcknowledge; - - /** * @version $Revision$ */