Repository: camel Updated Branches: refs/heads/master 65f9a3ab3 -> 832a99c54
Refactored logic around parsing destination names from the URI. Rejecting topics from batch consumption, as does not make sense conceptually. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2ba152d3 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2ba152d3 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2ba152d3 Branch: refs/heads/master Commit: 2ba152d387d23d2f0f4dc2297984d4173b147a5e Parents: ab1d1dd Author: jkorab <jakub.ko...@gmail.com> Authored: Fri Jul 17 11:42:47 2015 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Fri Jul 17 14:56:22 2015 +0200 ---------------------------------------------------------------------- .../camel/component/sjms/SjmsComponent.java | 2 +- .../camel/component/sjms/SjmsEndpoint.java | 22 ++------ .../component/sjms/batch/SjmsBatchConsumer.java | 15 +++--- .../component/sjms/batch/SjmsBatchEndpoint.java | 19 +++++-- .../sjms/jms/DestinationNameParser.java | 36 +++++++++++++ .../sjms/batch/SjmsBatchConsumerTest.java | 49 ++++++++++++++--- .../sjms/batch/SjmsBatchEndpointTest.java | 23 ++++++-- .../sjms/jms/DestinationNameParserTest.java | 56 ++++++++++++++++++++ .../sjms/producer/QueueProducerQoSTest.java | 23 ++++---- 9 files changed, 192 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/2ba152d3/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java index f2eebc6..2503162 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java @@ -61,7 +61,7 @@ public class SjmsComponent extends UriEndpointComponent implements HeaderFilterS protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { validateMepAndReplyTo(parameters); uri = normalizeUri(uri); - SjmsEndpoint endpoint = new SjmsEndpoint(uri, this); + SjmsEndpoint endpoint = new SjmsEndpoint(uri, this, remaining); setProperties(endpoint, parameters); if (endpoint.isTransacted()) { endpoint.setSynchronous(true); http://git-wip-us.apache.org/repos/asf/camel/blob/2ba152d3/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java index 67774d4..0e8d68a 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java @@ -22,11 +22,7 @@ import org.apache.camel.ExchangePattern; import org.apache.camel.MultipleConsumersSupport; import org.apache.camel.Processor; import org.apache.camel.Producer; -import org.apache.camel.component.sjms.jms.ConnectionResource; -import org.apache.camel.component.sjms.jms.DefaultDestinationCreationStrategy; -import org.apache.camel.component.sjms.jms.DestinationCreationStrategy; -import org.apache.camel.component.sjms.jms.KeyFormatStrategy; -import org.apache.camel.component.sjms.jms.SessionAcknowledgementType; +import org.apache.camel.component.sjms.jms.*; import org.apache.camel.component.sjms.producer.InOnlyProducer; import org.apache.camel.component.sjms.producer.InOutProducer; import org.apache.camel.impl.DefaultEndpoint; @@ -95,19 +91,11 @@ public class SjmsEndpoint extends DefaultEndpoint implements MultipleConsumersSu public SjmsEndpoint() { } - public SjmsEndpoint(String uri, Component component) { + public SjmsEndpoint(String uri, Component component, String remaining) { super(uri, component); - if (getEndpointUri().contains("://queue:")) { - topic = false; - } else if (getEndpointUri().contains("://topic:")) { - topic = true; - } else { - throw new IllegalArgumentException("Endpoint URI unsupported: " + uri); - } - destinationName = getEndpointUri().substring(getEndpointUri().lastIndexOf(":") + 1); - if (destinationName.contains("?")) { - destinationName = destinationName.substring(0, destinationName.lastIndexOf("?")); - } + DestinationNameParser parser = new DestinationNameParser(); + topic = parser.isTopic(remaining); + this.destinationName = parser.getShortName(remaining); } @Override http://git-wip-us.apache.org/repos/asf/camel/blob/2ba152d3/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java index 613d471..ca47c7c 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java @@ -16,13 +16,11 @@ */ package org.apache.camel.component.sjms.batch; -import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.component.sjms.jms.JmsMessageHelper; import org.apache.camel.impl.DefaultConsumer; import org.apache.camel.processor.aggregate.AggregationStrategy; -import org.apache.camel.spi.Synchronization; import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -90,7 +88,7 @@ public class SjmsBatchConsumer extends DefaultConsumer { } @Override - public Endpoint getEndpoint() { + public SjmsBatchEndpoint getEndpoint() { return sjmsBatchEndpoint; } @@ -164,10 +162,11 @@ public class SjmsBatchConsumer extends DefaultConsumer { // a batch corresponds to a single session that will be committed or rolled back by a background thread final Session session = connection.createSession(TRANSACTED, Session.CLIENT_ACKNOWLEDGE); try { - // destinationName only creates queues; there is no additional value to be gained - // by transactionally consuming topic messages in batches + // only batch consumption from queues is supported - it makes no sense to transactionally consume + // from a topic as you don't car about message loss, users can just use a regular aggregator instead Queue queue = session.createQueue(destinationName); MessageConsumer consumer = session.createConsumer(queue); + try { consumeBatchesOnLoop(session, consumer); } finally { @@ -289,11 +288,11 @@ public class SjmsBatchConsumer extends DefaultConsumer { int id = batchCount.getAndIncrement(); int batchSize = exchange.getProperty(SjmsBatchEndpoint.PROPERTY_BATCH_SIZE, Integer.class); if (LOG.isDebugEnabled()) { - LOG.debug("Processing batch:size={}:total={}", batchSize, messagesReceived.addAndGet(batchSize)); + LOG.debug("Processing batch[" + id + "]:size=" + batchSize + ":total=" + messagesReceived.addAndGet(batchSize)); } - Synchronization committing = new SessionCompletion(session); - exchange.addOnCompletion(committing); + SessionCompletion sessionCompletion = new SessionCompletion(session); + exchange.addOnCompletion(sessionCompletion); try { processor.process(exchange); LOG.debug("Completed processing[{}]:total={}", id, messagesProcessed.addAndGet(batchSize)); http://git-wip-us.apache.org/repos/asf/camel/blob/2ba152d3/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java index afd5cbe..5d307a7 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java @@ -20,6 +20,7 @@ import org.apache.camel.Component; import org.apache.camel.Consumer; import org.apache.camel.Processor; import org.apache.camel.Producer; +import org.apache.camel.component.sjms.jms.DestinationNameParser; import org.apache.camel.impl.DefaultEndpoint; import org.apache.camel.processor.aggregate.AggregationStrategy; import org.apache.camel.spi.Metadata; @@ -30,7 +31,10 @@ import org.apache.camel.spi.UriPath; /** * @author jkorab */ -@UriEndpoint(scheme = "sjmsBatch", title = "Simple JMS Batch Component", syntax = "sjms-batch:destinationName?aggregationStrategy=#aggStrategy", consumerClass = SjmsBatchComponent.class, label = "messaging") +@UriEndpoint(scheme = "sjmsBatch", + title = "Simple JMS Batch Component", + syntax = "sjms-batch:destinationName?aggregationStrategy=#aggStrategy", + consumerClass = SjmsBatchComponent.class, label = "messaging") public class SjmsBatchEndpoint extends DefaultEndpoint { public static final int DEFAULT_COMPLETION_SIZE = 200; // the default dispatch queue size in ActiveMQ @@ -38,7 +42,8 @@ public class SjmsBatchEndpoint extends DefaultEndpoint { public static final String PROPERTY_BATCH_SIZE = "CamelSjmsBatchSize"; @UriPath - @Metadata(required = "true") + @Metadata(required = "true", + description = "The destination name. Only queues are supported, names may be prefixed by 'queue:'.") private String destinationName; @UriParam(label = "consumer", defaultValue = "1", description = "The number of JMS sessions to consume from") @@ -61,11 +66,18 @@ public class SjmsBatchEndpoint extends DefaultEndpoint { @UriParam(label = "consumer", description = "A #-reference to an AggregationStrategy visible to Camel") private AggregationStrategy aggregationStrategy; + private boolean topic; + public SjmsBatchEndpoint() {} public SjmsBatchEndpoint(String endpointUri, Component component, String remaining) { super(endpointUri, component); - this.destinationName = remaining; + DestinationNameParser parser = new DestinationNameParser(); + if (parser.isTopic(remaining)) { + throw new IllegalArgumentException("Only batch consumption from queues is supported. For topics you " + + "should use a regular JMS consumer with an aggregator."); + } + this.destinationName = parser.getShortName(remaining); } @Override @@ -130,4 +142,5 @@ public class SjmsBatchEndpoint extends DefaultEndpoint { public void setPollDuration(Integer pollDuration) { this.pollDuration = pollDuration; } + } http://git-wip-us.apache.org/repos/asf/camel/blob/2ba152d3/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/DestinationNameParser.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/DestinationNameParser.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/DestinationNameParser.java new file mode 100644 index 0000000..d248350 --- /dev/null +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/DestinationNameParser.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.sjms.jms; + +/** + * @author jkorab + */ +public class DestinationNameParser { + public boolean isTopic(String destinationName) { + if (destinationName == null) { + throw new IllegalArgumentException("destinationName is null"); + } + return destinationName.startsWith("topic:"); + } + + public String getShortName(String destinationName) { + if (destinationName == null) { + throw new IllegalArgumentException("destinationName is null"); + } + return destinationName.substring(destinationName.lastIndexOf(":") + 1); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/2ba152d3/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java index 58fc717..642a38f 100644 --- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java +++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java @@ -17,9 +17,11 @@ package org.apache.camel.component.sjms.batch; import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.jmx.DestinationViewMBean; import org.apache.camel.CamelContext; import org.apache.camel.Exchange; import org.apache.camel.LoggingLevel; +import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.component.sjms.SjmsComponent; @@ -60,7 +62,7 @@ public class SjmsBatchConsumerTest extends CamelTestSupport { CamelContext context = new DefaultCamelContext(registry); context.addComponent("sjms", sjmsComponent); - context.addComponent("sjmsbatch", sjmsBatchComponent); + context.addComponent("sjms-batch", sjmsBatchComponent); return context; } @@ -100,7 +102,7 @@ public class SjmsBatchConsumerTest extends CamelTestSupport { int completionTimeout = 1000; int completionSize = 200; - fromF("sjmsbatch:%s?completionTimeout=%s&completionSize=%s" + + fromF("sjms-batch:%s?completionTimeout=%s&completionSize=%s" + "&consumerCount=%s&aggregationStrategy=#testStrategy", queueName, completionTimeout, completionSize, consumerCount) .routeId("batchConsumer").startupOrder(10).autoStartup(false) @@ -138,7 +140,7 @@ public class SjmsBatchConsumerTest extends CamelTestSupport { context.addRoutes(new TransactedSendHarness(queueName)); context.addRoutes(new RouteBuilder() { public void configure() throws Exception { - fromF("sjmsbatch:%s?completionTimeout=%s&completionSize=%s&aggregationStrategy=#testStrategy", + fromF("sjms-batch:%s?completionTimeout=%s&completionSize=%s&aggregationStrategy=#testStrategy", queueName, completionTimeout, completionSize).routeId("batchConsumer").startupOrder(10) .log(LoggingLevel.DEBUG, "${body.size}") .to("mock:batches"); @@ -163,7 +165,7 @@ public class SjmsBatchConsumerTest extends CamelTestSupport { context.addRoutes(new TransactedSendHarness(queueName)); context.addRoutes(new RouteBuilder() { public void configure() throws Exception { - fromF("sjmsbatch:%s?completionTimeout=%s&completionSize=%s&aggregationStrategy=#testStrategy", + fromF("sjms-batch:%s?completionTimeout=%s&completionSize=%s&aggregationStrategy=#testStrategy", queueName, completionTimeout, completionSize).routeId("batchConsumer").startupOrder(10) .to("mock:batches"); } @@ -199,11 +201,11 @@ public class SjmsBatchConsumerTest extends CamelTestSupport { .toF("sjms:%s", queueName + "B") .end(); - fromF("sjmsbatch:%s?completionTimeout=%s&completionSize=%s&aggregationStrategy=#testStrategy", + fromF("sjms-batch:%s?completionTimeout=%s&completionSize=%s&aggregationStrategy=#testStrategy", queueName + "A", completionTimeout, completionSize).routeId("batchConsumerA") .to("mock:outA"); - fromF("sjmsbatch:%s?completionTimeout=%s&completionSize=%s&aggregationStrategy=#testStrategy", + fromF("sjms-batch:%s?completionTimeout=%s&completionSize=%s&aggregationStrategy=#testStrategy", queueName + "B", completionTimeout, completionSize).routeId("batchConsumerB") .to("mock:outB"); @@ -226,6 +228,39 @@ public class SjmsBatchConsumerTest extends CamelTestSupport { assertFirstMessageBodyOfLength(mockOutB, messageCount); } + @Test + public void testConsumption_rollback() throws Exception { + final int completionTimeout = 2000; + final int completionSize = 5; + + final String queueName = getQueueName(); + context.addRoutes(new TransactedSendHarness(queueName)); + context.addRoutes(new RouteBuilder() { + public void configure() throws Exception { + fromF("sjms-batch:%s?completionTimeout=%s&completionSize=%s&aggregationStrategy=#testStrategy", + queueName, completionTimeout, completionSize).routeId("batchConsumer").startupOrder(10) + .to("mock:batches"); + } + }); + context.start(); + + int messageCount = 5; + MockEndpoint mockBatches = getMockEndpoint("mock:batches"); + // the first time around, the batch should throw an exception + mockBatches.whenExchangeReceived(1, new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + throw new RuntimeException("Boom!"); + } + }); + // so the batch should be processed twice due to redelivery + mockBatches.expectedMessageCount(2); + + template.sendBody("direct:in", generateStrings(messageCount)); + mockBatches.assertIsSatisfied(); + + } + private void assertFirstMessageBodyOfLength(MockEndpoint mockEndpoint, int expectedLength) { Exchange exchange = mockEndpoint.getExchanges().get(0); assertEquals(expectedLength, exchange.getIn().getBody(List.class).size()); @@ -233,7 +268,7 @@ public class SjmsBatchConsumerTest extends CamelTestSupport { private String getQueueName() { SimpleDateFormat sdf = new SimpleDateFormat("yyMMddhhmmss"); - return "sjmsbatch-" + sdf.format(new Date()); + return "sjms-batch-" + sdf.format(new Date()); } private String[] generateStrings(int messageCount) { http://git-wip-us.apache.org/repos/asf/camel/blob/2ba152d3/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpointTest.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpointTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpointTest.java index 7a75e76..c97d0dd 100644 --- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpointTest.java +++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpointTest.java @@ -18,6 +18,8 @@ package org.apache.camel.component.sjms.batch; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.camel.CamelContext; +import org.apache.camel.FailedToCreateProducerException; +import org.apache.camel.FailedToCreateRouteException; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.sjms.SjmsComponent; import org.apache.camel.impl.DefaultCamelContext; @@ -68,7 +70,7 @@ public class SjmsBatchEndpointTest extends CamelTestSupport { sjmsBatchComponent.setConnectionFactory(connectionFactory); CamelContext context = new DefaultCamelContext(registry); - context.addComponent("sjmsbatch", sjmsBatchComponent); + context.addComponent("sjms-batch", sjmsBatchComponent); context.addComponent("sjms", sjmsComponent); return context; @@ -79,11 +81,11 @@ public class SjmsBatchEndpointTest extends CamelTestSupport { return true; } - @Test(expected = org.apache.camel.FailedToCreateProducerException.class) + @Test(expected = FailedToCreateProducerException.class) public void testProducerFailure() throws Exception { context.addRoutes(new RouteBuilder() { public void configure() throws Exception { - from("direct:in").to("sjmsbatch:testQueue"); + from("direct:in").to("sjms-batch:testQueue"); } }); context.start(); @@ -94,7 +96,7 @@ public class SjmsBatchEndpointTest extends CamelTestSupport { context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { - from("sjmsbatch:in?aggregationStrategy=#aggStrategy&pollDuration=-1") + from("sjms-batch:in?aggregationStrategy=#aggStrategy&pollDuration=-1") .to("mock:out"); } }); @@ -106,11 +108,22 @@ public class SjmsBatchEndpointTest extends CamelTestSupport { context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { - from("sjmsbatch:in?aggregationStrategy=#aggStrategy&consumerCount=-1") + from("sjms-batch:in?aggregationStrategy=#aggStrategy&consumerCount=-1") .to("mock:out"); } }); context.start(); } + @Test(expected = FailedToCreateRouteException.class) + public void testConsumer_topic() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("sjms-batch:topic:in?aggregationStrategy=#aggStrategy") + .to("mock:out"); + } + }); + context.start(); + } } http://git-wip-us.apache.org/repos/asf/camel/blob/2ba152d3/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/jms/DestinationNameParserTest.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/jms/DestinationNameParserTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/jms/DestinationNameParserTest.java new file mode 100644 index 0000000..3fb4968 --- /dev/null +++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/jms/DestinationNameParserTest.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.sjms.jms; + +import org.junit.Test; + +import static org.junit.Assert.*; + +/** + * @author jkorab + */ +public class DestinationNameParserTest { + + @Test + public void testIsTopic() throws Exception { + DestinationNameParser parser = new DestinationNameParser(); + assertTrue(parser.isTopic("topic:foo")); + assertFalse(parser.isTopic("queue:bar")); + assertFalse(parser.isTopic("bar")); + } + + @Test(expected = IllegalArgumentException.class) + public void testIsTopic_nullDestinationName() throws Exception { + DestinationNameParser parser = new DestinationNameParser(); + parser.isTopic(null); + } + + @Test + public void testGetShortName() throws Exception { + DestinationNameParser parser = new DestinationNameParser(); + assertEquals("foo", parser.getShortName("topic:foo")); + assertFalse("bar", parser.isTopic("queue:bar")); + assertFalse("bar", parser.isTopic("bar")); + } + + @Test(expected = IllegalArgumentException.class) + public void testGetShortName_nullDestinationName() throws Exception { + DestinationNameParser parser = new DestinationNameParser(); + parser.getShortName(null); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/2ba152d3/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/QueueProducerQoSTest.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/QueueProducerQoSTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/QueueProducerQoSTest.java index c82842a..beef0a9 100644 --- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/QueueProducerQoSTest.java +++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/QueueProducerQoSTest.java @@ -16,14 +16,14 @@ */ package org.apache.camel.component.sjms.producer; -import java.util.concurrent.TimeUnit; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.jmx.DestinationViewMBean; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.command.ActiveMQQueue; -import org.apache.camel.builder.NotifyBuilder; +import org.apache.camel.EndpointInject; import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.component.sjms.support.JmsTestSupport; import org.junit.Test; @@ -33,12 +33,14 @@ public class QueueProducerQoSTest extends JmsTestSupport { private static final String TEST_INOUT_DESTINATION_NAME = "queue.producer.test.qos.inout"; private static final String EXPIRED_MESSAGE_ROUTE_ID = "expiredAdvisoryRoute"; + public static final String MOCK_EXPIRED_ADVISORY = "mock:expiredAdvisory"; + + @EndpointInject(uri = MOCK_EXPIRED_ADVISORY) + MockEndpoint mockExpiredAdvisory; @Test public void testInOutQueueProducerTTL() throws Exception { - - NotifyBuilder expireMatcher = new NotifyBuilder(context) - .fromRoute(EXPIRED_MESSAGE_ROUTE_ID).whenCompleted(1).create(); + mockExpiredAdvisory.expectedMessageCount(1); String endpoint = String.format("sjms:queue:%s?ttl=1000&exchangePattern=InOut&responseTimeOut=500", TEST_INOUT_DESTINATION_NAME); @@ -51,8 +53,7 @@ public class QueueProducerQoSTest extends JmsTestSupport { // we're just interested in the message becoming expired } - // we should delay a bit so broker can run its expiration processes... - assertFalse(expireMatcher.matches(2, TimeUnit.SECONDS)); + assertMockEndpointsSatisfied(); DestinationViewMBean queue = getQueueMBean(TEST_INOUT_DESTINATION_NAME); assertEquals("There were unexpected messages left in the queue: " + TEST_INOUT_DESTINATION_NAME, @@ -61,14 +62,12 @@ public class QueueProducerQoSTest extends JmsTestSupport { @Test public void testInOnlyQueueProducerTTL() throws Exception { - NotifyBuilder expireMatcher = new NotifyBuilder(context) - .fromRoute(EXPIRED_MESSAGE_ROUTE_ID).whenCompleted(1).create(); + mockExpiredAdvisory.expectedMessageCount(1); String endpoint = String.format("sjms:queue:%s?ttl=1000", TEST_INONLY_DESTINATION_NAME); template.sendBody(endpoint, "test message"); - // we should delay a bit so broker can run its expiration processes... - assertFalse(expireMatcher.matches(2, TimeUnit.SECONDS)); + assertMockEndpointsSatisfied(); DestinationViewMBean queue = getQueueMBean(TEST_INONLY_DESTINATION_NAME); assertEquals("There were unexpected messages left in the queue: " + TEST_INONLY_DESTINATION_NAME, @@ -102,7 +101,7 @@ public class QueueProducerQoSTest extends JmsTestSupport { public void configure() throws Exception { from("sjms:topic:ActiveMQ.Advisory.Expired.Queue.>") .routeId(EXPIRED_MESSAGE_ROUTE_ID) - .to("mock:expiredAdvisory"); + .to(MOCK_EXPIRED_ADVISORY); } }; }