Repository: camel Updated Branches: refs/heads/master c34177d71 -> 5d1eebccc
fix for https://issues.apache.org/jira/browse/CAMEL-7230 SJMS does not respect QoS settings (ttl/persistence) for sending to queues Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/5d1eebcc Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/5d1eebcc Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/5d1eebcc Branch: refs/heads/master Commit: 5d1eebccce0e88e053be100c401a97ecb10e8908 Parents: c34177d Author: Christian Posta <christian.po...@gmail.com> Authored: Thu Feb 20 13:47:08 2014 -0700 Committer: Christian Posta <christian.po...@gmail.com> Committed: Thu Feb 20 13:47:08 2014 -0700 ---------------------------------------------------------------------- components/camel-sjms/pom.xml | 5 + .../component/sjms/producer/InOnlyProducer.java | 8 +- .../component/sjms/producer/InOutProducer.java | 7 +- .../sjms/producer/QueueProduerQoSTest.java | 112 +++++++++++++++++++ .../component/sjms/support/JmsTestSupport.java | 26 ++++- 5 files changed, 146 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/5d1eebcc/components/camel-sjms/pom.xml ---------------------------------------------------------------------- diff --git a/components/camel-sjms/pom.xml b/components/camel-sjms/pom.xml index ee9d55c..a4c0083 100644 --- a/components/camel-sjms/pom.xml +++ b/components/camel-sjms/pom.xml @@ -77,6 +77,11 @@ <artifactId>activemq-broker</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.activemq</groupId> + <artifactId>activemq-kahadb-store</artifactId> + <scope>test</scope> + </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> http://git-wip-us.apache.org/repos/asf/camel/blob/5d1eebcc/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java index 84eb1f5..e841e6b 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java @@ -70,12 +70,8 @@ public class InOnlyProducer extends SjmsProducer { session = conn.createSession(false, getAcknowledgeMode()); } - MessageProducer messageProducer; - if (isTopic()) { - messageProducer = JmsObjectFactory.createMessageProducer(session, getDestinationName(), isTopic(), isPersistent(), getTtl()); - } else { - messageProducer = JmsObjectFactory.createQueueProducer(session, getDestinationName()); - } + MessageProducer messageProducer = JmsObjectFactory.createMessageProducer(session, getDestinationName(), isTopic(), isPersistent(), getTtl()); + answer = new MessageProducerResources(session, messageProducer, commitStrategy); } catch (Exception e) { log.error("Unable to create the MessageProducer: " + e.getLocalizedMessage()); http://git-wip-us.apache.org/repos/asf/camel/blob/5d1eebcc/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java index 0936ecf..2b93df7 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java @@ -248,11 +248,8 @@ public class InOutProducer extends SjmsProducer { } else { session = conn.createSession(false, getAcknowledgeMode()); } - if (isTopic()) { - messageProducer = JmsObjectFactory.createMessageProducer(session, getDestinationName(), isTopic(), isPersistent(), getTtl()); - } else { - messageProducer = JmsObjectFactory.createQueueProducer(session, getDestinationName()); - } + + messageProducer = JmsObjectFactory.createMessageProducer(session, getDestinationName(), isTopic(), isPersistent(), getTtl()); if (session == null) { throw new CamelException("Message Consumer Creation Exception: Session is NULL"); http://git-wip-us.apache.org/repos/asf/camel/blob/5d1eebcc/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/QueueProduerQoSTest.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/QueueProduerQoSTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/QueueProduerQoSTest.java new file mode 100644 index 0000000..36d0827 --- /dev/null +++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/QueueProduerQoSTest.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.sjms.producer; + + +import java.util.concurrent.TimeUnit; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.jmx.DestinationViewMBean; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.camel.builder.NotifyBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.sjms.support.JmsTestSupport; +import org.junit.Test; + + +public class QueueProduerQoSTest extends JmsTestSupport { + + private static final String TEST_INONLY_DESTINATION_NAME = "queue.producer.test.qos.inonly"; + private static final String TEST_INOUT_DESTINATION_NAME = "queue.producer.test.qos.inout"; + + private static final String EXPIRED_MESSAGE_ROUTE_ID = "expiredAdvisoryRoute"; + + @Test + public void testInOutQueueProducerTTL() throws Exception { + + NotifyBuilder expireMatcher = new NotifyBuilder(context) + .fromRoute(EXPIRED_MESSAGE_ROUTE_ID).whenCompleted(1).create(); + + String endpoint = String.format("sjms:queue:%s?ttl=1000&exchangePattern=InOut&responseTimeOut=500", TEST_INOUT_DESTINATION_NAME); + + try { + template.requestBody(endpoint, "test message"); + fail("we aren't expecting any consumers, so should not succeed"); + } catch (Exception e) { + // we are expecting an exception here because there are no consumers on this queue, + // so we will not be able to do a real InOut/request-response, but that's okay + // we're just interested in the message becoming expired + } + + // we should delay a bit so broker can run its expiration processes... + expireMatcher.matches(2, TimeUnit.SECONDS); + + DestinationViewMBean queue = getQueueMBean(TEST_INOUT_DESTINATION_NAME); + assertEquals("There were unexpected messages left in the queue: " + TEST_INOUT_DESTINATION_NAME, + 0, queue.getQueueSize()); + } + + @Test + public void testInOnlyQueueProducerTTL() throws Exception { + NotifyBuilder expireMatcher = new NotifyBuilder(context) + .fromRoute(EXPIRED_MESSAGE_ROUTE_ID).whenCompleted(1).create(); + + String endpoint = String.format("sjms:queue:%s?ttl=1000", TEST_INONLY_DESTINATION_NAME); + template.sendBody(endpoint, "test message"); + + // we should delay a bit so broker can run its expiration processes... + expireMatcher.matches(2, TimeUnit.SECONDS); + + + DestinationViewMBean queue = getQueueMBean(TEST_INONLY_DESTINATION_NAME); + assertEquals("There were unexpected messages left in the queue: " + TEST_INONLY_DESTINATION_NAME, + 0, queue.getQueueSize()); + } + + @Override + protected void configureBroker(BrokerService broker) throws Exception { + broker.setUseJmx(true); + broker.setPersistent(true); + broker.setDataDirectory("target/activemq-data"); + broker.deleteAllMessages(); + broker.setAdvisorySupport(true); + broker.addConnector(brokerUri); + + // configure expiration rate + ActiveMQQueue queueName = new ActiveMQQueue(">"); + PolicyEntry entry = new PolicyEntry(); + entry.setDestination(queueName); + entry.setExpireMessagesPeriod(1000); + + PolicyMap policyMap = new PolicyMap(); + policyMap.put(queueName, entry); + broker.setDestinationPolicy(policyMap); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("sjms:topic:ActiveMQ.Advisory.Expired.Queue.>") + .routeId(EXPIRED_MESSAGE_ROUTE_ID) + .to("mock:expiredAdvisory"); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/5d1eebcc/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/JmsTestSupport.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/JmsTestSupport.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/JmsTestSupport.java index 1613a68..6bf4ff8 100644 --- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/JmsTestSupport.java +++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/JmsTestSupport.java @@ -18,9 +18,12 @@ package org.apache.camel.component.sjms.support; import javax.jms.Connection; import javax.jms.Session; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.jmx.DestinationViewMBean; import org.apache.camel.CamelContext; import org.apache.camel.Produce; import org.apache.camel.ProducerTemplate; @@ -29,6 +32,7 @@ import org.apache.camel.impl.DefaultCamelContext; import org.apache.camel.test.AvailablePortFinder; import org.apache.camel.test.junit4.CamelTestSupport; + /** * A support class that builds up and tears down an ActiveMQ instance to be used * for unit testing. @@ -37,7 +41,7 @@ public class JmsTestSupport extends CamelTestSupport { @Produce protected ProducerTemplate template; - private String brokerUri; + protected String brokerUri; private BrokerService broker; private Connection connection; private Session session; @@ -54,11 +58,20 @@ public class JmsTestSupport extends CamelTestSupport { brokerUri = "tcp://localhost:" + AvailablePortFinder.getNextAvailable(33333); broker = new BrokerService(); + configureBroker(broker); + startBroker(); + } + + protected void configureBroker(BrokerService broker) throws Exception { broker.setUseJmx(true); broker.setPersistent(false); broker.deleteAllMessages(); broker.addConnector(brokerUri); + } + + private void startBroker() throws Exception { broker.start(); + broker.waitUntilStarted(); } @Override @@ -104,6 +117,17 @@ public class JmsTestSupport extends CamelTestSupport { return camelContext; } + public DestinationViewMBean getQueueMBean(String queueName) throws MalformedObjectNameException { + return getDestinationMBean(queueName, false); + } + public DestinationViewMBean getDestinationMBean(String destinationName, boolean topic) throws MalformedObjectNameException { + String domain = "org.apache.activemq"; + String destinationType = topic ? "Topic" : "Queue"; + ObjectName name = new ObjectName(String.format("%s:type=Broker,brokerName=localhost,destinationType=%s,destinationName=%s", + domain, destinationType, destinationName)); + return (DestinationViewMBean) broker.getManagementContext().newProxyInstance(name, DestinationViewMBean.class, true); + } + public void setSession(Session session) { this.session = session; }