Polished and fixed CS and a few other bits
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/832a99c5 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/832a99c5 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/832a99c5 Branch: refs/heads/master Commit: 832a99c546d4fca5510a88136de8fb2002c54d6f Parents: 2ba152d Author: Claus Ibsen <davscl...@apache.org> Authored: Fri Jul 17 15:25:15 2015 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Fri Jul 17 15:25:15 2015 +0200 ---------------------------------------------------------------------- .../camel/component/sjms/SjmsEndpoint.java | 7 +- .../component/sjms/batch/SessionCompletion.java | 22 ++-- .../sjms/batch/SjmsBatchComponent.java | 14 +-- .../component/sjms/batch/SjmsBatchConsumer.java | 79 +++++++------- .../component/sjms/batch/SjmsBatchEndpoint.java | 104 +++++++++---------- .../sjms/jms/DestinationNameParser.java | 1 + .../sjms/batch/ListAggregationStrategy.java | 12 +-- .../sjms/batch/SjmsBatchConsumerTest.java | 43 ++++---- .../sjms/batch/SjmsBatchEndpointTest.java | 11 +- .../sjms/jms/DestinationNameParserTest.java | 4 +- .../sjms/producer/QueueProducerQoSTest.java | 2 +- 11 files changed, 145 insertions(+), 154 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/832a99c5/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java index 0e8d68a..6ffa513 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java @@ -22,7 +22,12 @@ import org.apache.camel.ExchangePattern; import org.apache.camel.MultipleConsumersSupport; import org.apache.camel.Processor; import org.apache.camel.Producer; -import org.apache.camel.component.sjms.jms.*; +import org.apache.camel.component.sjms.jms.ConnectionResource; +import org.apache.camel.component.sjms.jms.DefaultDestinationCreationStrategy; +import org.apache.camel.component.sjms.jms.DestinationCreationStrategy; +import org.apache.camel.component.sjms.jms.DestinationNameParser; +import org.apache.camel.component.sjms.jms.KeyFormatStrategy; +import org.apache.camel.component.sjms.jms.SessionAcknowledgementType; import org.apache.camel.component.sjms.producer.InOnlyProducer; import org.apache.camel.component.sjms.producer.InOutProducer; import org.apache.camel.impl.DefaultEndpoint; http://git-wip-us.apache.org/repos/asf/camel/blob/832a99c5/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SessionCompletion.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SessionCompletion.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SessionCompletion.java index 27f06e6..cae90cb 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SessionCompletion.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SessionCompletion.java @@ -16,34 +16,32 @@ */ package org.apache.camel.component.sjms.batch; +import javax.jms.JMSException; +import javax.jms.Session; + import org.apache.camel.Exchange; import org.apache.camel.spi.Synchronization; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.jms.JMSException; -import javax.jms.Session; - -/** - * @author jkorab - */ class SessionCompletion implements Synchronization { - private final Logger log = LoggerFactory.getLogger(this.getClass()); + private static final Logger LOG = LoggerFactory.getLogger(SessionCompletion.class); private final Session session; + // TODO: add more details in the commit/rollback eg such as message id + public SessionCompletion(Session session) { - assert (session != null); this.session = session; } @Override public void onComplete(Exchange exchange) { try { - log.debug("Committing"); + LOG.debug("Committing"); session.commit(); } catch (JMSException ex) { - log.error("Exception caught while committing: {}", ex.getMessage()); + LOG.error("Exception caught while committing: {}", ex.getMessage()); exchange.setException(ex); } } @@ -51,10 +49,10 @@ class SessionCompletion implements Synchronization { @Override public void onFailure(Exchange exchange) { try { - log.debug("Rolling back"); + LOG.debug("Rolling back"); session.rollback(); } catch (JMSException ex) { - log.error("Exception caught while rolling back: {}", ex.getMessage()); + LOG.error("Exception caught while rolling back: {}", ex.getMessage()); exchange.setException(ex); } } http://git-wip-us.apache.org/repos/asf/camel/blob/832a99c5/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchComponent.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchComponent.java index 04b875d..421fd8a 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchComponent.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchComponent.java @@ -16,16 +16,13 @@ */ package org.apache.camel.component.sjms.batch; +import java.util.Map; +import javax.jms.ConnectionFactory; + import org.apache.camel.Endpoint; import org.apache.camel.impl.UriEndpointComponent; import org.apache.camel.util.ObjectHelper; -import javax.jms.ConnectionFactory; -import java.util.Map; - -/** - * @author jkorab - */ public class SjmsBatchComponent extends UriEndpointComponent { private ConnectionFactory connectionFactory; @@ -36,7 +33,7 @@ public class SjmsBatchComponent extends UriEndpointComponent { @Override protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { - ObjectHelper.notNull(connectionFactory, "connectionFactory is null"); + ObjectHelper.notNull(connectionFactory, "connectionFactory"); SjmsBatchEndpoint sjmsBatchEndpoint = new SjmsBatchEndpoint(uri, this, remaining); setProperties(sjmsBatchEndpoint, parameters); return sjmsBatchEndpoint; @@ -46,6 +43,9 @@ public class SjmsBatchComponent extends UriEndpointComponent { return connectionFactory; } + /** + * A ConnectionFactory is required to enable the SjmsBatchComponent. + */ public void setConnectionFactory(ConnectionFactory connectionFactory) { this.connectionFactory = connectionFactory; } http://git-wip-us.apache.org/repos/asf/camel/blob/832a99c5/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java index ca47c7c..ee2b250 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java @@ -16,16 +16,6 @@ */ package org.apache.camel.component.sjms.batch; -import org.apache.camel.Exchange; -import org.apache.camel.Processor; -import org.apache.camel.component.sjms.jms.JmsMessageHelper; -import org.apache.camel.impl.DefaultConsumer; -import org.apache.camel.processor.aggregate.AggregationStrategy; -import org.apache.camel.util.ObjectHelper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.jms.*; import java.io.PrintWriter; import java.io.StringWriter; import java.util.Date; @@ -36,30 +26,47 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.ObjectMessage; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.component.sjms.jms.JmsMessageHelper; +import org.apache.camel.impl.DefaultConsumer; +import org.apache.camel.processor.aggregate.AggregationStrategy; +import org.apache.camel.util.ObjectHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -/** - * @author jkorab - */ public class SjmsBatchConsumer extends DefaultConsumer { private static final boolean TRANSACTED = true; - private final Logger LOG = LoggerFactory.getLogger(SjmsBatchConsumer.class); + private static final Logger LOG = LoggerFactory.getLogger(SjmsBatchConsumer.class); + + // global counters, maybe they should be on component instead? + private static final AtomicInteger BATCH_COUNT = new AtomicInteger(); + private static final AtomicLong MESSAGE_RECEIVED = new AtomicLong(); + private static final AtomicLong MESSAGE_PROCESSED = new AtomicLong(); private final SjmsBatchEndpoint sjmsBatchEndpoint; private final AggregationStrategy aggregationStrategy; - private final int completionSize; private final int completionTimeout; private final int consumerCount; private final int pollDuration; - private final ConnectionFactory connectionFactory; private final String destinationName; private final Processor processor; - - private static AtomicInteger batchCount = new AtomicInteger(); - private static AtomicLong messagesReceived = new AtomicLong(); - private static AtomicLong messagesProcessed = new AtomicLong(); private ExecutorService jmsConsumerExecutors; + private final AtomicBoolean running = new AtomicBoolean(true); + private final AtomicReference<CountDownLatch> consumersShutdownLatchRef = new AtomicReference<>(); + private Connection connection; public SjmsBatchConsumer(SjmsBatchEndpoint sjmsBatchEndpoint, Processor processor) { super(sjmsBatchEndpoint, processor); @@ -92,11 +99,6 @@ public class SjmsBatchConsumer extends DefaultConsumer { return sjmsBatchEndpoint; } - private final AtomicBoolean running = new AtomicBoolean(true); - private final AtomicReference<CountDownLatch> consumersShutdownLatchRef = new AtomicReference<>(); - - private Connection connection; - @Override protected void doStart() throws Exception { super.doStart(); @@ -145,8 +147,7 @@ public class SjmsBatchConsumer extends DefaultConsumer { LOG.error("Exception caught closing connection: {}", getStackTrace(jex)); } - getEndpoint().getCamelContext().getExecutorServiceManager() - .shutdown(jmsConsumerExecutors); + getEndpoint().getCamelContext().getExecutorServiceManager().shutdown(jmsConsumerExecutors); } private String getStackTrace(Exception ex) { @@ -174,7 +175,6 @@ public class SjmsBatchConsumer extends DefaultConsumer { consumer.close(); } catch (JMSException ex2) { log.error("Exception caught closing consumer: {}", ex2.getMessage()); - } } } finally { @@ -197,7 +197,7 @@ public class SjmsBatchConsumer extends DefaultConsumer { private void consumeBatchesOnLoop(Session session, MessageConsumer consumer) throws JMSException { final boolean usingTimeout = completionTimeout > 0; - batchConsumption: + batchConsumption: while (running.get()) { int messageCount = 0; @@ -206,7 +206,7 @@ public class SjmsBatchConsumer extends DefaultConsumer { long startTime = 0; Exchange aggregatedExchange = null; - batch: + batch: while ((completionSize <= 0) || (messageCount < completionSize)) { // check periodically to see whether we should be shutting down long waitTime = (usingTimeout && (timeElapsed > 0)) @@ -219,7 +219,7 @@ public class SjmsBatchConsumer extends DefaultConsumer { // timed out, no message received LOG.trace("No message received"); } else { - if ((usingTimeout) && (messageCount == 0)) { // this is the first message + if (usingTimeout && messageCount == 0) { // this is the first message startTime = new Date().getTime(); // start counting down the period for this batch } messageCount++; @@ -230,12 +230,11 @@ public class SjmsBatchConsumer extends DefaultConsumer { aggregatedExchange = aggregationStrategy.aggregate(aggregatedExchange, exchange); aggregatedExchange.setProperty(SjmsBatchEndpoint.PROPERTY_BATCH_SIZE, messageCount); } else { - throw new IllegalArgumentException("Unexpected message type: " - + message.getClass().toString()); + throw new IllegalArgumentException("Unexpected message type: " + message.getClass().toString()); } } - if ((usingTimeout) && (startTime > 0)) { + if (usingTimeout && startTime > 0) { // a batch has been started, check whether it should be timed out long currentTime = new Date().getTime(); timeElapsed = currentTime - startTime; @@ -252,13 +251,13 @@ public class SjmsBatchConsumer extends DefaultConsumer { break batchConsumption; } } // batch - assert (aggregatedExchange != null); process(aggregatedExchange, session); } } /** * Determine the time that a call to {@link MessageConsumer#receive()} should wait given the time that has elapsed for this batch. + * * @param timeElapsed The time that has elapsed. * @return The shorter of the time remaining or poll duration. */ @@ -277,25 +276,25 @@ public class SjmsBatchConsumer extends DefaultConsumer { private long getTimeRemaining(long timeElapsed) { long timeRemaining = completionTimeout - timeElapsed; - if (LOG.isDebugEnabled() && (timeElapsed > 0)) { + if (LOG.isDebugEnabled() && timeElapsed > 0) { LOG.debug("Time remaining this batch: {}", timeRemaining); } return timeRemaining; } private void process(Exchange exchange, Session session) { - assert (exchange != null); - int id = batchCount.getAndIncrement(); + int id = BATCH_COUNT.getAndIncrement(); int batchSize = exchange.getProperty(SjmsBatchEndpoint.PROPERTY_BATCH_SIZE, Integer.class); if (LOG.isDebugEnabled()) { - LOG.debug("Processing batch[" + id + "]:size=" + batchSize + ":total=" + messagesReceived.addAndGet(batchSize)); + LOG.debug("Processing batch[" + id + "]:size=" + batchSize + ":total=" + MESSAGE_RECEIVED.addAndGet(batchSize)); } SessionCompletion sessionCompletion = new SessionCompletion(session); exchange.addOnCompletion(sessionCompletion); try { processor.process(exchange); - LOG.debug("Completed processing[{}]:total={}", id, messagesProcessed.addAndGet(batchSize)); + long total = MESSAGE_PROCESSED.addAndGet(batchSize); + LOG.debug("Completed processing[{}]:total={}", id, total); } catch (Exception e) { LOG.error("Error processing exchange: {}", e.getMessage()); } http://git-wip-us.apache.org/repos/asf/camel/blob/832a99c5/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java index 5d307a7..b4c052f 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java @@ -28,12 +28,7 @@ import org.apache.camel.spi.UriEndpoint; import org.apache.camel.spi.UriParam; import org.apache.camel.spi.UriPath; -/** - * @author jkorab - */ -@UriEndpoint(scheme = "sjmsBatch", - title = "Simple JMS Batch Component", - syntax = "sjms-batch:destinationName?aggregationStrategy=#aggStrategy", +@UriEndpoint(scheme = "sjms-batch", title = "Simple JMS Batch Component", syntax = "sjms-batch:destinationName", consumerClass = SjmsBatchComponent.class, label = "messaging") public class SjmsBatchEndpoint extends DefaultEndpoint { @@ -41,41 +36,29 @@ public class SjmsBatchEndpoint extends DefaultEndpoint { public static final int DEFAULT_COMPLETION_TIMEOUT = 500; public static final String PROPERTY_BATCH_SIZE = "CamelSjmsBatchSize"; - @UriPath - @Metadata(required = "true", - description = "The destination name. Only queues are supported, names may be prefixed by 'queue:'.") + @UriPath(label = "consumer") @Metadata(required = "true") private String destinationName; - - @UriParam(label = "consumer", defaultValue = "1", description = "The number of JMS sessions to consume from") - private Integer consumerCount = 1; - - @UriParam(label = "consumer", defaultValue = "200", - description = "The number of messages consumed at which the batch will be completed") - private Integer completionSize = DEFAULT_COMPLETION_SIZE; - - @UriParam(label = "consumer", defaultValue = "500", - description = "The timeout from receipt of the first first message when the batch will be completed") - private Integer completionTimeout = DEFAULT_COMPLETION_TIMEOUT; - - @UriParam(label = "consumer", defaultValue = "1000", - description = "The duration in milliseconds of each poll for messages. " + - "completionTimeOut will be used if it is shorter and a batch has started.") - private Integer pollDuration = 1000; - - @Metadata(required = "true") - @UriParam(label = "consumer", description = "A #-reference to an AggregationStrategy visible to Camel") + @UriParam(label = "consumer", defaultValue = "1") + private int consumerCount = 1; + @UriParam(label = "consumer", defaultValue = "200") + private int completionSize = DEFAULT_COMPLETION_SIZE; + @UriParam(label = "consumer", defaultValue = "500") + private int completionTimeout = DEFAULT_COMPLETION_TIMEOUT; + @UriParam(label = "consumer", defaultValue = "1000") + private int pollDuration = 1000; + @UriParam(label = "consumer") @Metadata(required = "true") private AggregationStrategy aggregationStrategy; - private boolean topic; - - public SjmsBatchEndpoint() {} + public SjmsBatchEndpoint() { + } public SjmsBatchEndpoint(String endpointUri, Component component, String remaining) { super(endpointUri, component); + DestinationNameParser parser = new DestinationNameParser(); if (parser.isTopic(remaining)) { - throw new IllegalArgumentException("Only batch consumption from queues is supported. For topics you " + - "should use a regular JMS consumer with an aggregator."); + throw new IllegalArgumentException("Only batch consumption from queues is supported. For topics you " + + "should use a regular JMS consumer with an aggregator."); } this.destinationName = parser.getShortName(remaining); } @@ -99,47 +82,62 @@ public class SjmsBatchEndpoint extends DefaultEndpoint { return aggregationStrategy; } + /** + * The aggregation strategy to use, which merges all the batched messages into a single message + */ public void setAggregationStrategy(AggregationStrategy aggregationStrategy) { this.aggregationStrategy = aggregationStrategy; } - public Integer getCompletionSize() { - return completionSize; - } - - public void setCompletionSize(Integer completionSize) { - this.completionSize = completionSize; + /** + * The destination name. Only queues are supported, names may be prefixed by 'queue:'. + */ + public String getDestinationName() { + return destinationName; } - public Integer getCompletionTimeout() { - return completionTimeout; + public int getConsumerCount() { + return consumerCount; } - public void setCompletionTimeout(Integer completionTimeout) { - this.completionTimeout = completionTimeout; + /** + * The number of JMS sessions to consume from + */ + public void setConsumerCount(int consumerCount) { + this.consumerCount = consumerCount; } - public String getDestinationName() { - return destinationName; + public int getCompletionSize() { + return completionSize; } - public void setDestinationName(String destinationName) { - this.destinationName = destinationName; + /** + * The number of messages consumed at which the batch will be completed + */ + public void setCompletionSize(int completionSize) { + this.completionSize = completionSize; } - public Integer getConsumerCount() { - return consumerCount; + public int getCompletionTimeout() { + return completionTimeout; } - public void setConsumerCount(Integer consumerCount) { - this.consumerCount = consumerCount; + /** + * The timeout from receipt of the first first message when the batch will be completed + */ + public void setCompletionTimeout(int completionTimeout) { + this.completionTimeout = completionTimeout; } - public Integer getPollDuration() { + public int getPollDuration() { return pollDuration; } - public void setPollDuration(Integer pollDuration) { + /** + * The duration in milliseconds of each poll for messages. + * completionTimeOut will be used if it is shorter and a batch has started. + */ + public void setPollDuration(int pollDuration) { this.pollDuration = pollDuration; } http://git-wip-us.apache.org/repos/asf/camel/blob/832a99c5/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/DestinationNameParser.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/DestinationNameParser.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/DestinationNameParser.java index d248350..ddc213d 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/DestinationNameParser.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/DestinationNameParser.java @@ -20,6 +20,7 @@ package org.apache.camel.component.sjms.jms; * @author jkorab */ public class DestinationNameParser { + public boolean isTopic(String destinationName) { if (destinationName == null) { throw new IllegalArgumentException("destinationName is null"); http://git-wip-us.apache.org/repos/asf/camel/blob/832a99c5/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/ListAggregationStrategy.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/ListAggregationStrategy.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/ListAggregationStrategy.java index 39774b2..d1eb6e5 100644 --- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/ListAggregationStrategy.java +++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/ListAggregationStrategy.java @@ -16,16 +16,14 @@ */ package org.apache.camel.component.sjms.batch; -import org.apache.camel.Exchange; -import org.apache.camel.processor.aggregate.AggregationStrategy; - import java.util.ArrayList; import java.util.List; -/** - * @author jkorab - */ +import org.apache.camel.Exchange; +import org.apache.camel.processor.aggregate.AggregationStrategy; + public class ListAggregationStrategy implements AggregationStrategy { + @Override public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { String body = newExchange.getIn().getBody(String.class); @@ -34,7 +32,7 @@ public class ListAggregationStrategy implements AggregationStrategy { list.add(body); newExchange.getIn().setBody(list); return newExchange; - } else { + } else { List<String> list = oldExchange.getIn().getBody(List.class); list.add(body); return oldExchange; http://git-wip-us.apache.org/repos/asf/camel/blob/832a99c5/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java index 642a38f..76c739b 100644 --- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java +++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java @@ -16,8 +16,12 @@ */ package org.apache.camel.component.sjms.batch; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.List; +import javax.jms.ConnectionFactory; + import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.jmx.DestinationViewMBean; import org.apache.camel.CamelContext; import org.apache.camel.Exchange; import org.apache.camel.LoggingLevel; @@ -34,16 +38,8 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.jms.ConnectionFactory; -import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.List; - -/** - * @author jkorab - */ public class SjmsBatchConsumerTest extends CamelTestSupport { - private final Logger LOG = LoggerFactory.getLogger(SjmsBatchConsumerTest.class); + private static final Logger LOG = LoggerFactory.getLogger(SjmsBatchConsumerTest.class); @Rule public EmbeddedActiveMQBroker broker = new EmbeddedActiveMQBroker("localhost"); @@ -76,10 +72,10 @@ public class SjmsBatchConsumerTest extends CamelTestSupport { @Override public void configure() throws Exception { from("direct:in").routeId("harness").startupOrder(20) - .split(body()) + .split(body()) .toF("sjms:queue:%s?transacted=true", queueName) .to("mock:before") - .end(); + .end(); } } @@ -102,12 +98,11 @@ public class SjmsBatchConsumerTest extends CamelTestSupport { int completionTimeout = 1000; int completionSize = 200; - fromF("sjms-batch:%s?completionTimeout=%s&completionSize=%s" + - "&consumerCount=%s&aggregationStrategy=#testStrategy", + fromF("sjms-batch:%s?completionTimeout=%s&completionSize=%s&consumerCount=%s&aggregationStrategy=#testStrategy", queueName, completionTimeout, completionSize, consumerCount) .routeId("batchConsumer").startupOrder(10).autoStartup(false) - .split(body()) - .to("mock:split"); + .split(body()) + .to("mock:split"); } }); context.start(); @@ -132,7 +127,7 @@ public class SjmsBatchConsumerTest extends CamelTestSupport { } @Test - public void testConsumption_completionSize() throws Exception { + public void testConsumptionCompletionSize() throws Exception { final int completionSize = 5; final int completionTimeout = -1; // size-based only @@ -157,7 +152,7 @@ public class SjmsBatchConsumerTest extends CamelTestSupport { } @Test - public void testConsumption_completionTimeout() throws Exception { + public void testConsumptionCompletionTimeout() throws Exception { final int completionTimeout = 2000; final int completionSize = -1; // timeout-based only @@ -186,7 +181,7 @@ public class SjmsBatchConsumerTest extends CamelTestSupport { * Checks whether multiple consumer endpoints can operate in parallel. */ @Test - public void testConsumption_multipleConsumerEndpoints() throws Exception { + public void testConsumptionMultipleConsumerEndpoints() throws Exception { final int completionTimeout = 2000; final int completionSize = 5; @@ -196,10 +191,10 @@ public class SjmsBatchConsumerTest extends CamelTestSupport { from("direct:in") .split().body() - .multicast() - .toF("sjms:%s", queueName + "A") - .toF("sjms:%s", queueName + "B") - .end(); + .multicast() + .toF("sjms:%s", queueName + "A") + .toF("sjms:%s", queueName + "B") + .end(); fromF("sjms-batch:%s?completionTimeout=%s&completionSize=%s&aggregationStrategy=#testStrategy", queueName + "A", completionTimeout, completionSize).routeId("batchConsumerA") @@ -229,7 +224,7 @@ public class SjmsBatchConsumerTest extends CamelTestSupport { } @Test - public void testConsumption_rollback() throws Exception { + public void testConsumptionRollback() throws Exception { final int completionTimeout = 2000; final int completionSize = 5; http://git-wip-us.apache.org/repos/asf/camel/blob/832a99c5/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpointTest.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpointTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpointTest.java index c97d0dd..45fe324 100644 --- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpointTest.java +++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpointTest.java @@ -30,9 +30,6 @@ import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; -/** - * @author jkorab - */ public class SjmsBatchEndpointTest extends CamelTestSupport { // Create one embedded broker instance for the entire test, as we aren't actually @@ -92,19 +89,19 @@ public class SjmsBatchEndpointTest extends CamelTestSupport { } @Test(expected = IllegalArgumentException.class) - public void testConsumer_negativePollDuration() throws Exception { + public void testConsumerNegativePollDuration() throws Exception { context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { from("sjms-batch:in?aggregationStrategy=#aggStrategy&pollDuration=-1") - .to("mock:out"); + .to("mock:out"); } }); context.start(); } @Test(expected = IllegalArgumentException.class) - public void testConsumer_negativeConsumerCount() throws Exception { + public void testConsumerNegativeConsumerCount() throws Exception { context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { @@ -116,7 +113,7 @@ public class SjmsBatchEndpointTest extends CamelTestSupport { } @Test(expected = FailedToCreateRouteException.class) - public void testConsumer_topic() throws Exception { + public void testConsumerTopic() throws Exception { context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { http://git-wip-us.apache.org/repos/asf/camel/blob/832a99c5/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/jms/DestinationNameParserTest.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/jms/DestinationNameParserTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/jms/DestinationNameParserTest.java index 3fb4968..7f25239 100644 --- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/jms/DestinationNameParserTest.java +++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/jms/DestinationNameParserTest.java @@ -34,7 +34,7 @@ public class DestinationNameParserTest { } @Test(expected = IllegalArgumentException.class) - public void testIsTopic_nullDestinationName() throws Exception { + public void testIsTopicNullDestinationName() throws Exception { DestinationNameParser parser = new DestinationNameParser(); parser.isTopic(null); } @@ -48,7 +48,7 @@ public class DestinationNameParserTest { } @Test(expected = IllegalArgumentException.class) - public void testGetShortName_nullDestinationName() throws Exception { + public void testGetShortNameNullDestinationName() throws Exception { DestinationNameParser parser = new DestinationNameParser(); parser.getShortName(null); } http://git-wip-us.apache.org/repos/asf/camel/blob/832a99c5/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/QueueProducerQoSTest.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/QueueProducerQoSTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/QueueProducerQoSTest.java index beef0a9..2a925f0 100644 --- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/QueueProducerQoSTest.java +++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/QueueProducerQoSTest.java @@ -33,7 +33,7 @@ public class QueueProducerQoSTest extends JmsTestSupport { private static final String TEST_INOUT_DESTINATION_NAME = "queue.producer.test.qos.inout"; private static final String EXPIRED_MESSAGE_ROUTE_ID = "expiredAdvisoryRoute"; - public static final String MOCK_EXPIRED_ADVISORY = "mock:expiredAdvisory"; + private static final String MOCK_EXPIRED_ADVISORY = "mock:expiredAdvisory"; @EndpointInject(uri = MOCK_EXPIRED_ADVISORY) MockEndpoint mockExpiredAdvisory;