Repository: camel
Updated Branches:
  refs/heads/master 65f9a3ab3 -> 832a99c54


Refactored logic around parsing destination names from the URI. Rejecting 
topics from batch consumption, as does not make sense conceptually.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2ba152d3
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2ba152d3
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2ba152d3

Branch: refs/heads/master
Commit: 2ba152d387d23d2f0f4dc2297984d4173b147a5e
Parents: ab1d1dd
Author: jkorab <jakub.ko...@gmail.com>
Authored: Fri Jul 17 11:42:47 2015 +0100
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Fri Jul 17 14:56:22 2015 +0200

----------------------------------------------------------------------
 .../camel/component/sjms/SjmsComponent.java     |  2 +-
 .../camel/component/sjms/SjmsEndpoint.java      | 22 ++------
 .../component/sjms/batch/SjmsBatchConsumer.java | 15 +++---
 .../component/sjms/batch/SjmsBatchEndpoint.java | 19 +++++--
 .../sjms/jms/DestinationNameParser.java         | 36 +++++++++++++
 .../sjms/batch/SjmsBatchConsumerTest.java       | 49 ++++++++++++++---
 .../sjms/batch/SjmsBatchEndpointTest.java       | 23 ++++++--
 .../sjms/jms/DestinationNameParserTest.java     | 56 ++++++++++++++++++++
 .../sjms/producer/QueueProducerQoSTest.java     | 23 ++++----
 9 files changed, 192 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/2ba152d3/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java
----------------------------------------------------------------------
diff --git 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java
 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java
index f2eebc6..2503162 100644
--- 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java
+++ 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java
@@ -61,7 +61,7 @@ public class SjmsComponent extends UriEndpointComponent 
implements HeaderFilterS
     protected Endpoint createEndpoint(String uri, String remaining, 
Map<String, Object> parameters) throws Exception {
         validateMepAndReplyTo(parameters);
         uri = normalizeUri(uri);
-        SjmsEndpoint endpoint = new SjmsEndpoint(uri, this);
+        SjmsEndpoint endpoint = new SjmsEndpoint(uri, this, remaining);
         setProperties(endpoint, parameters);
         if (endpoint.isTransacted()) {
             endpoint.setSynchronous(true);

http://git-wip-us.apache.org/repos/asf/camel/blob/2ba152d3/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
index 67774d4..0e8d68a 100644
--- 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
+++ 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
@@ -22,11 +22,7 @@ import org.apache.camel.ExchangePattern;
 import org.apache.camel.MultipleConsumersSupport;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
-import org.apache.camel.component.sjms.jms.ConnectionResource;
-import org.apache.camel.component.sjms.jms.DefaultDestinationCreationStrategy;
-import org.apache.camel.component.sjms.jms.DestinationCreationStrategy;
-import org.apache.camel.component.sjms.jms.KeyFormatStrategy;
-import org.apache.camel.component.sjms.jms.SessionAcknowledgementType;
+import org.apache.camel.component.sjms.jms.*;
 import org.apache.camel.component.sjms.producer.InOnlyProducer;
 import org.apache.camel.component.sjms.producer.InOutProducer;
 import org.apache.camel.impl.DefaultEndpoint;
@@ -95,19 +91,11 @@ public class SjmsEndpoint extends DefaultEndpoint 
implements MultipleConsumersSu
     public SjmsEndpoint() {
     }
 
-    public SjmsEndpoint(String uri, Component component) {
+    public SjmsEndpoint(String uri, Component component, String remaining) {
         super(uri, component);
-        if (getEndpointUri().contains("://queue:")) {
-            topic = false;
-        } else if (getEndpointUri().contains("://topic:")) {
-            topic = true;
-        } else {
-            throw new IllegalArgumentException("Endpoint URI unsupported: " + 
uri);
-        }
-        destinationName = 
getEndpointUri().substring(getEndpointUri().lastIndexOf(":") + 1);
-        if (destinationName.contains("?")) {
-            destinationName = destinationName.substring(0, 
destinationName.lastIndexOf("?"));
-        }
+        DestinationNameParser parser = new DestinationNameParser();
+        topic = parser.isTopic(remaining);
+        this.destinationName = parser.getShortName(remaining);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/camel/blob/2ba152d3/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
index 613d471..ca47c7c 100644
--- 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
+++ 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
@@ -16,13 +16,11 @@
  */
 package org.apache.camel.component.sjms.batch;
 
-import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.component.sjms.jms.JmsMessageHelper;
 import org.apache.camel.impl.DefaultConsumer;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
-import org.apache.camel.spi.Synchronization;
 import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -90,7 +88,7 @@ public class SjmsBatchConsumer extends DefaultConsumer {
     }
 
     @Override
-    public Endpoint getEndpoint() {
+    public SjmsBatchEndpoint getEndpoint() {
         return sjmsBatchEndpoint;
     }
 
@@ -164,10 +162,11 @@ public class SjmsBatchConsumer extends DefaultConsumer {
                 // a batch corresponds to a single session that will be 
committed or rolled back by a background thread
                 final Session session = connection.createSession(TRANSACTED, 
Session.CLIENT_ACKNOWLEDGE);
                 try {
-                    // destinationName only creates queues; there is no 
additional value to be gained
-                    // by transactionally consuming topic messages in batches
+                    // only batch consumption from queues is supported - it 
makes no sense to transactionally consume
+                    // from a topic as you don't car about message loss, users 
can just use a regular aggregator instead
                     Queue queue = session.createQueue(destinationName);
                     MessageConsumer consumer = session.createConsumer(queue);
+
                     try {
                         consumeBatchesOnLoop(session, consumer);
                     } finally {
@@ -289,11 +288,11 @@ public class SjmsBatchConsumer extends DefaultConsumer {
             int id = batchCount.getAndIncrement();
             int batchSize = 
exchange.getProperty(SjmsBatchEndpoint.PROPERTY_BATCH_SIZE, Integer.class);
             if (LOG.isDebugEnabled()) {
-                LOG.debug("Processing batch:size={}:total={}", batchSize, 
messagesReceived.addAndGet(batchSize));
+                LOG.debug("Processing batch[" + id + "]:size=" + batchSize + 
":total=" + messagesReceived.addAndGet(batchSize));
             }
 
-            Synchronization committing = new SessionCompletion(session);
-            exchange.addOnCompletion(committing);
+            SessionCompletion sessionCompletion = new 
SessionCompletion(session);
+            exchange.addOnCompletion(sessionCompletion);
             try {
                 processor.process(exchange);
                 LOG.debug("Completed processing[{}]:total={}", id, 
messagesProcessed.addAndGet(batchSize));

http://git-wip-us.apache.org/repos/asf/camel/blob/2ba152d3/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
index afd5cbe..5d307a7 100644
--- 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
+++ 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
@@ -20,6 +20,7 @@ import org.apache.camel.Component;
 import org.apache.camel.Consumer;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
+import org.apache.camel.component.sjms.jms.DestinationNameParser;
 import org.apache.camel.impl.DefaultEndpoint;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.spi.Metadata;
@@ -30,7 +31,10 @@ import org.apache.camel.spi.UriPath;
 /**
  * @author jkorab
  */
-@UriEndpoint(scheme = "sjmsBatch", title = "Simple JMS Batch Component", 
syntax = "sjms-batch:destinationName?aggregationStrategy=#aggStrategy", 
consumerClass = SjmsBatchComponent.class, label = "messaging")
+@UriEndpoint(scheme = "sjmsBatch",
+        title = "Simple JMS Batch Component",
+        syntax = "sjms-batch:destinationName?aggregationStrategy=#aggStrategy",
+        consumerClass = SjmsBatchComponent.class, label = "messaging")
 public class SjmsBatchEndpoint extends DefaultEndpoint {
 
     public static final int DEFAULT_COMPLETION_SIZE = 200; // the default 
dispatch queue size in ActiveMQ
@@ -38,7 +42,8 @@ public class SjmsBatchEndpoint extends DefaultEndpoint {
     public static final String PROPERTY_BATCH_SIZE = "CamelSjmsBatchSize";
 
     @UriPath
-    @Metadata(required = "true")
+    @Metadata(required = "true",
+            description = "The destination name. Only queues are supported, 
names may be prefixed by 'queue:'.")
     private String destinationName;
 
     @UriParam(label = "consumer", defaultValue = "1", description = "The 
number of JMS sessions to consume from")
@@ -61,11 +66,18 @@ public class SjmsBatchEndpoint extends DefaultEndpoint {
     @UriParam(label = "consumer", description = "A #-reference to an 
AggregationStrategy visible to Camel")
     private AggregationStrategy aggregationStrategy;
 
+    private boolean topic;
+
     public SjmsBatchEndpoint() {}
 
     public SjmsBatchEndpoint(String endpointUri, Component component, String 
remaining) {
         super(endpointUri, component);
-        this.destinationName = remaining;
+        DestinationNameParser parser = new DestinationNameParser();
+        if (parser.isTopic(remaining)) {
+            throw new IllegalArgumentException("Only batch consumption from 
queues is supported. For topics you " +
+                    "should use a regular JMS consumer with an aggregator.");
+        }
+        this.destinationName = parser.getShortName(remaining);
     }
 
     @Override
@@ -130,4 +142,5 @@ public class SjmsBatchEndpoint extends DefaultEndpoint {
     public void setPollDuration(Integer pollDuration) {
         this.pollDuration = pollDuration;
     }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/2ba152d3/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/DestinationNameParser.java
----------------------------------------------------------------------
diff --git 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/DestinationNameParser.java
 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/DestinationNameParser.java
new file mode 100644
index 0000000..d248350
--- /dev/null
+++ 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/DestinationNameParser.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.jms;
+
+/**
+ * @author jkorab
+ */
+public class DestinationNameParser {
+    public boolean isTopic(String destinationName) {
+        if (destinationName == null) {
+            throw new IllegalArgumentException("destinationName is null");
+        }
+        return destinationName.startsWith("topic:");
+    }
+
+    public String getShortName(String destinationName) {
+        if (destinationName == null) {
+            throw new IllegalArgumentException("destinationName is null");
+        }
+        return destinationName.substring(destinationName.lastIndexOf(":") + 1);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/2ba152d3/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
 
b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
index 58fc717..642a38f 100644
--- 
a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
+++ 
b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
@@ -17,9 +17,11 @@
 package org.apache.camel.component.sjms.batch;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.jmx.DestinationViewMBean;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.LoggingLevel;
+import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.component.sjms.SjmsComponent;
@@ -60,7 +62,7 @@ public class SjmsBatchConsumerTest extends CamelTestSupport {
 
         CamelContext context = new DefaultCamelContext(registry);
         context.addComponent("sjms", sjmsComponent);
-        context.addComponent("sjmsbatch", sjmsBatchComponent);
+        context.addComponent("sjms-batch", sjmsBatchComponent);
         return context;
     }
 
@@ -100,7 +102,7 @@ public class SjmsBatchConsumerTest extends CamelTestSupport 
{
                 int completionTimeout = 1000;
                 int completionSize = 200;
 
-                fromF("sjmsbatch:%s?completionTimeout=%s&completionSize=%s" +
+                fromF("sjms-batch:%s?completionTimeout=%s&completionSize=%s" +
                         "&consumerCount=%s&aggregationStrategy=#testStrategy",
                         queueName, completionTimeout, completionSize, 
consumerCount)
                         
.routeId("batchConsumer").startupOrder(10).autoStartup(false)
@@ -138,7 +140,7 @@ public class SjmsBatchConsumerTest extends CamelTestSupport 
{
         context.addRoutes(new TransactedSendHarness(queueName));
         context.addRoutes(new RouteBuilder() {
             public void configure() throws Exception {
-                
fromF("sjmsbatch:%s?completionTimeout=%s&completionSize=%s&aggregationStrategy=#testStrategy",
+                
fromF("sjms-batch:%s?completionTimeout=%s&completionSize=%s&aggregationStrategy=#testStrategy",
                         queueName, completionTimeout, 
completionSize).routeId("batchConsumer").startupOrder(10)
                         .log(LoggingLevel.DEBUG, "${body.size}")
                         .to("mock:batches");
@@ -163,7 +165,7 @@ public class SjmsBatchConsumerTest extends CamelTestSupport 
{
         context.addRoutes(new TransactedSendHarness(queueName));
         context.addRoutes(new RouteBuilder() {
             public void configure() throws Exception {
-                
fromF("sjmsbatch:%s?completionTimeout=%s&completionSize=%s&aggregationStrategy=#testStrategy",
+                
fromF("sjms-batch:%s?completionTimeout=%s&completionSize=%s&aggregationStrategy=#testStrategy",
                         queueName, completionTimeout, 
completionSize).routeId("batchConsumer").startupOrder(10)
                         .to("mock:batches");
             }
@@ -199,11 +201,11 @@ public class SjmsBatchConsumerTest extends 
CamelTestSupport {
                         .toF("sjms:%s", queueName + "B")
                     .end();
 
-                
fromF("sjmsbatch:%s?completionTimeout=%s&completionSize=%s&aggregationStrategy=#testStrategy",
+                
fromF("sjms-batch:%s?completionTimeout=%s&completionSize=%s&aggregationStrategy=#testStrategy",
                         queueName + "A", completionTimeout, 
completionSize).routeId("batchConsumerA")
                         .to("mock:outA");
 
-                
fromF("sjmsbatch:%s?completionTimeout=%s&completionSize=%s&aggregationStrategy=#testStrategy",
+                
fromF("sjms-batch:%s?completionTimeout=%s&completionSize=%s&aggregationStrategy=#testStrategy",
                         queueName + "B", completionTimeout, 
completionSize).routeId("batchConsumerB")
                         .to("mock:outB");
 
@@ -226,6 +228,39 @@ public class SjmsBatchConsumerTest extends 
CamelTestSupport {
         assertFirstMessageBodyOfLength(mockOutB, messageCount);
     }
 
+    @Test
+    public void testConsumption_rollback() throws Exception {
+        final int completionTimeout = 2000;
+        final int completionSize = 5;
+
+        final String queueName = getQueueName();
+        context.addRoutes(new TransactedSendHarness(queueName));
+        context.addRoutes(new RouteBuilder() {
+            public void configure() throws Exception {
+                
fromF("sjms-batch:%s?completionTimeout=%s&completionSize=%s&aggregationStrategy=#testStrategy",
+                        queueName, completionTimeout, 
completionSize).routeId("batchConsumer").startupOrder(10)
+                        .to("mock:batches");
+            }
+        });
+        context.start();
+
+        int messageCount = 5;
+        MockEndpoint mockBatches = getMockEndpoint("mock:batches");
+        // the first time around, the batch should throw an exception
+        mockBatches.whenExchangeReceived(1, new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                throw new RuntimeException("Boom!");
+            }
+        });
+        // so the batch should be processed twice due to redelivery
+        mockBatches.expectedMessageCount(2);
+
+        template.sendBody("direct:in", generateStrings(messageCount));
+        mockBatches.assertIsSatisfied();
+
+    }
+
     private void assertFirstMessageBodyOfLength(MockEndpoint mockEndpoint, int 
expectedLength) {
         Exchange exchange = mockEndpoint.getExchanges().get(0);
         assertEquals(expectedLength, 
exchange.getIn().getBody(List.class).size());
@@ -233,7 +268,7 @@ public class SjmsBatchConsumerTest extends CamelTestSupport 
{
 
     private String getQueueName() {
         SimpleDateFormat sdf = new SimpleDateFormat("yyMMddhhmmss");
-        return "sjmsbatch-" + sdf.format(new Date());
+        return "sjms-batch-" + sdf.format(new Date());
     }
 
     private String[] generateStrings(int messageCount) {

http://git-wip-us.apache.org/repos/asf/camel/blob/2ba152d3/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpointTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpointTest.java
 
b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpointTest.java
index 7a75e76..c97d0dd 100644
--- 
a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpointTest.java
+++ 
b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpointTest.java
@@ -18,6 +18,8 @@ package org.apache.camel.component.sjms.batch;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.camel.CamelContext;
+import org.apache.camel.FailedToCreateProducerException;
+import org.apache.camel.FailedToCreateRouteException;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.sjms.SjmsComponent;
 import org.apache.camel.impl.DefaultCamelContext;
@@ -68,7 +70,7 @@ public class SjmsBatchEndpointTest extends CamelTestSupport {
         sjmsBatchComponent.setConnectionFactory(connectionFactory);
 
         CamelContext context = new DefaultCamelContext(registry);
-        context.addComponent("sjmsbatch", sjmsBatchComponent);
+        context.addComponent("sjms-batch", sjmsBatchComponent);
         context.addComponent("sjms", sjmsComponent);
 
         return context;
@@ -79,11 +81,11 @@ public class SjmsBatchEndpointTest extends CamelTestSupport 
{
         return true;
     }
 
-    @Test(expected = org.apache.camel.FailedToCreateProducerException.class)
+    @Test(expected = FailedToCreateProducerException.class)
     public void testProducerFailure() throws Exception {
         context.addRoutes(new RouteBuilder() {
             public void configure() throws Exception {
-                from("direct:in").to("sjmsbatch:testQueue");
+                from("direct:in").to("sjms-batch:testQueue");
             }
         });
         context.start();
@@ -94,7 +96,7 @@ public class SjmsBatchEndpointTest extends CamelTestSupport {
         context.addRoutes(new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                
from("sjmsbatch:in?aggregationStrategy=#aggStrategy&pollDuration=-1")
+                
from("sjms-batch:in?aggregationStrategy=#aggStrategy&pollDuration=-1")
                     .to("mock:out");
             }
         });
@@ -106,11 +108,22 @@ public class SjmsBatchEndpointTest extends 
CamelTestSupport {
         context.addRoutes(new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                
from("sjmsbatch:in?aggregationStrategy=#aggStrategy&consumerCount=-1")
+                
from("sjms-batch:in?aggregationStrategy=#aggStrategy&consumerCount=-1")
                         .to("mock:out");
             }
         });
         context.start();
     }
 
+    @Test(expected = FailedToCreateRouteException.class)
+    public void testConsumer_topic() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("sjms-batch:topic:in?aggregationStrategy=#aggStrategy")
+                        .to("mock:out");
+            }
+        });
+        context.start();
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/2ba152d3/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/jms/DestinationNameParserTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/jms/DestinationNameParserTest.java
 
b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/jms/DestinationNameParserTest.java
new file mode 100644
index 0000000..3fb4968
--- /dev/null
+++ 
b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/jms/DestinationNameParserTest.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.jms;
+
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+/**
+ * @author jkorab
+ */
+public class DestinationNameParserTest {
+
+    @Test
+    public void testIsTopic() throws Exception {
+        DestinationNameParser parser = new DestinationNameParser();
+        assertTrue(parser.isTopic("topic:foo"));
+        assertFalse(parser.isTopic("queue:bar"));
+        assertFalse(parser.isTopic("bar"));
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testIsTopic_nullDestinationName() throws Exception {
+        DestinationNameParser parser = new DestinationNameParser();
+        parser.isTopic(null);
+    }
+
+    @Test
+    public void testGetShortName() throws Exception {
+        DestinationNameParser parser = new DestinationNameParser();
+        assertEquals("foo", parser.getShortName("topic:foo"));
+        assertFalse("bar", parser.isTopic("queue:bar"));
+        assertFalse("bar", parser.isTopic("bar"));
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testGetShortName_nullDestinationName() throws Exception {
+        DestinationNameParser parser = new DestinationNameParser();
+        parser.getShortName(null);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/2ba152d3/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/QueueProducerQoSTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/QueueProducerQoSTest.java
 
b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/QueueProducerQoSTest.java
index c82842a..beef0a9 100644
--- 
a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/QueueProducerQoSTest.java
+++ 
b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/QueueProducerQoSTest.java
@@ -16,14 +16,14 @@
  */
 package org.apache.camel.component.sjms.producer;
 
-import java.util.concurrent.TimeUnit;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.jmx.DestinationViewMBean;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.camel.builder.NotifyBuilder;
+import org.apache.camel.EndpointInject;
 import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.component.sjms.support.JmsTestSupport;
 import org.junit.Test;
 
@@ -33,12 +33,14 @@ public class QueueProducerQoSTest extends JmsTestSupport {
     private static final String TEST_INOUT_DESTINATION_NAME = 
"queue.producer.test.qos.inout";
 
     private static final String EXPIRED_MESSAGE_ROUTE_ID = 
"expiredAdvisoryRoute";
+    public static final String MOCK_EXPIRED_ADVISORY = "mock:expiredAdvisory";
+
+    @EndpointInject(uri = MOCK_EXPIRED_ADVISORY)
+    MockEndpoint mockExpiredAdvisory;
 
     @Test
     public void testInOutQueueProducerTTL() throws Exception {
-
-        NotifyBuilder expireMatcher = new NotifyBuilder(context)
-                .fromRoute(EXPIRED_MESSAGE_ROUTE_ID).whenCompleted(1).create();
+        mockExpiredAdvisory.expectedMessageCount(1);
 
         String endpoint = 
String.format("sjms:queue:%s?ttl=1000&exchangePattern=InOut&responseTimeOut=500",
 TEST_INOUT_DESTINATION_NAME);
 
@@ -51,8 +53,7 @@ public class QueueProducerQoSTest extends JmsTestSupport {
             // we're just interested in the message becoming expired
         }
 
-        // we should delay a bit so broker can run its expiration processes...
-        assertFalse(expireMatcher.matches(2, TimeUnit.SECONDS));
+        assertMockEndpointsSatisfied();
 
         DestinationViewMBean queue = 
getQueueMBean(TEST_INOUT_DESTINATION_NAME);
         assertEquals("There were unexpected messages left in the queue: " + 
TEST_INOUT_DESTINATION_NAME,
@@ -61,14 +62,12 @@ public class QueueProducerQoSTest extends JmsTestSupport {
 
     @Test
     public void testInOnlyQueueProducerTTL() throws Exception {
-        NotifyBuilder expireMatcher = new NotifyBuilder(context)
-                .fromRoute(EXPIRED_MESSAGE_ROUTE_ID).whenCompleted(1).create();
+        mockExpiredAdvisory.expectedMessageCount(1);
 
         String endpoint = String.format("sjms:queue:%s?ttl=1000", 
TEST_INONLY_DESTINATION_NAME);
         template.sendBody(endpoint, "test message");
 
-        // we should delay a bit so broker can run its expiration processes...
-        assertFalse(expireMatcher.matches(2, TimeUnit.SECONDS));
+        assertMockEndpointsSatisfied();
 
         DestinationViewMBean queue = 
getQueueMBean(TEST_INONLY_DESTINATION_NAME);
         assertEquals("There were unexpected messages left in the queue: " + 
TEST_INONLY_DESTINATION_NAME,
@@ -102,7 +101,7 @@ public class QueueProducerQoSTest extends JmsTestSupport {
             public void configure() throws Exception {
                 from("sjms:topic:ActiveMQ.Advisory.Expired.Queue.>")
                         .routeId(EXPIRED_MESSAGE_ROUTE_ID)
-                        .to("mock:expiredAdvisory");
+                        .to(MOCK_EXPIRED_ADVISORY);
             }
         };
     }

Reply via email to