Merged in sjms-batch component.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ab1d1dd7 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ab1d1dd7 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ab1d1dd7 Branch: refs/heads/master Commit: ab1d1dd78fe53edb50c4ede447e4ac5a55ee2ac9 Parents: 65f9a3a Author: jkorab <jakub.ko...@gmail.com> Authored: Thu Jul 16 11:32:29 2015 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Fri Jul 17 14:56:22 2015 +0200 ---------------------------------------------------------------------- components/camel-sjms/pom.xml | 6 +- .../component/sjms/batch/SessionCompletion.java | 61 ++++ .../sjms/batch/SjmsBatchComponent.java | 53 ++++ .../component/sjms/batch/SjmsBatchConsumer.java | 306 +++++++++++++++++++ .../component/sjms/batch/SjmsBatchEndpoint.java | 133 ++++++++ .../sjms/consumer/AbstractMessageHandler.java | 3 +- .../component/sjms/jms/JmsMessageHelper.java | 21 +- .../org/apache/camel/component/sjms-batch | 18 ++ .../sjms/batch/EmbeddedActiveMQBroker.java | 74 +++++ .../sjms/batch/ListAggregationStrategy.java | 43 +++ .../sjms/batch/SjmsBatchConsumerTest.java | 247 +++++++++++++++ .../sjms/batch/SjmsBatchEndpointTest.java | 116 +++++++ 12 files changed, 1075 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/ab1d1dd7/components/camel-sjms/pom.xml ---------------------------------------------------------------------- diff --git a/components/camel-sjms/pom.xml b/components/camel-sjms/pom.xml index 9778517..6977796 100644 --- a/components/camel-sjms/pom.xml +++ b/components/camel-sjms/pom.xml @@ -32,7 +32,8 @@ <properties> <camel.osgi.export.pkg> org.apache.camel.component.sjms, - org.apache.camel.component.sjms.jms + org.apache.camel.component.sjms.jms, + org.apache.camel.component.sjms.batch </camel.osgi.export.pkg> <camel.osgi.private.pkg> org.apache.camel.component.sjms.consumer, @@ -41,7 +42,8 @@ org.apache.camel.component.sjms.tx </camel.osgi.private.pkg> <camel.osgi.export.service> - org.apache.camel.spi.ComponentResolver;component=sjms + org.apache.camel.spi.ComponentResolver;component=sjms, + org.apache.camel.spi.ComponentResolver;component=sjms-batch </camel.osgi.export.service> </properties> http://git-wip-us.apache.org/repos/asf/camel/blob/ab1d1dd7/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SessionCompletion.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SessionCompletion.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SessionCompletion.java new file mode 100644 index 0000000..27f06e6 --- /dev/null +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SessionCompletion.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.sjms.batch; + +import org.apache.camel.Exchange; +import org.apache.camel.spi.Synchronization; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.JMSException; +import javax.jms.Session; + +/** + * @author jkorab + */ +class SessionCompletion implements Synchronization { + private final Logger log = LoggerFactory.getLogger(this.getClass()); + + private final Session session; + + public SessionCompletion(Session session) { + assert (session != null); + this.session = session; + } + + @Override + public void onComplete(Exchange exchange) { + try { + log.debug("Committing"); + session.commit(); + } catch (JMSException ex) { + log.error("Exception caught while committing: {}", ex.getMessage()); + exchange.setException(ex); + } + } + + @Override + public void onFailure(Exchange exchange) { + try { + log.debug("Rolling back"); + session.rollback(); + } catch (JMSException ex) { + log.error("Exception caught while rolling back: {}", ex.getMessage()); + exchange.setException(ex); + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/ab1d1dd7/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchComponent.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchComponent.java new file mode 100644 index 0000000..04b875d --- /dev/null +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchComponent.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.sjms.batch; + +import org.apache.camel.Endpoint; +import org.apache.camel.impl.UriEndpointComponent; +import org.apache.camel.util.ObjectHelper; + +import javax.jms.ConnectionFactory; +import java.util.Map; + +/** + * @author jkorab + */ +public class SjmsBatchComponent extends UriEndpointComponent { + + private ConnectionFactory connectionFactory; + + public SjmsBatchComponent() { + super(SjmsBatchEndpoint.class); + } + + @Override + protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { + ObjectHelper.notNull(connectionFactory, "connectionFactory is null"); + SjmsBatchEndpoint sjmsBatchEndpoint = new SjmsBatchEndpoint(uri, this, remaining); + setProperties(sjmsBatchEndpoint, parameters); + return sjmsBatchEndpoint; + } + + public ConnectionFactory getConnectionFactory() { + return connectionFactory; + } + + public void setConnectionFactory(ConnectionFactory connectionFactory) { + this.connectionFactory = connectionFactory; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/ab1d1dd7/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java new file mode 100644 index 0000000..613d471 --- /dev/null +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java @@ -0,0 +1,306 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.sjms.batch; + +import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.component.sjms.jms.JmsMessageHelper; +import org.apache.camel.impl.DefaultConsumer; +import org.apache.camel.processor.aggregate.AggregationStrategy; +import org.apache.camel.spi.Synchronization; +import org.apache.camel.util.ObjectHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.*; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.Date; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +/** + * @author jkorab + */ +public class SjmsBatchConsumer extends DefaultConsumer { + private static final boolean TRANSACTED = true; + private final Logger LOG = LoggerFactory.getLogger(SjmsBatchConsumer.class); + + private final SjmsBatchEndpoint sjmsBatchEndpoint; + private final AggregationStrategy aggregationStrategy; + + private final int completionSize; + private final int completionTimeout; + private final int consumerCount; + private final int pollDuration; + + private final ConnectionFactory connectionFactory; + private final String destinationName; + private final Processor processor; + + private static AtomicInteger batchCount = new AtomicInteger(); + private static AtomicLong messagesReceived = new AtomicLong(); + private static AtomicLong messagesProcessed = new AtomicLong(); + private ExecutorService jmsConsumerExecutors; + + public SjmsBatchConsumer(SjmsBatchEndpoint sjmsBatchEndpoint, Processor processor) { + super(sjmsBatchEndpoint, processor); + + this.sjmsBatchEndpoint = ObjectHelper.notNull(sjmsBatchEndpoint, "batchJmsEndpoint"); + this.processor = ObjectHelper.notNull(processor, "processor"); + + destinationName = ObjectHelper.notEmpty(sjmsBatchEndpoint.getDestinationName(), "destinationName"); + + completionSize = sjmsBatchEndpoint.getCompletionSize(); + completionTimeout = sjmsBatchEndpoint.getCompletionTimeout(); + pollDuration = sjmsBatchEndpoint.getPollDuration(); + if (pollDuration < 0) { + throw new IllegalArgumentException("pollDuration must be 0 or greater"); + } + + this.aggregationStrategy = ObjectHelper.notNull(sjmsBatchEndpoint.getAggregationStrategy(), "aggregationStrategy"); + + consumerCount = sjmsBatchEndpoint.getConsumerCount(); + if (consumerCount <= 0) { + throw new IllegalArgumentException("consumerCount must be greater than 0"); + } + + SjmsBatchComponent sjmsBatchComponent = (SjmsBatchComponent) sjmsBatchEndpoint.getComponent(); + connectionFactory = ObjectHelper.notNull(sjmsBatchComponent.getConnectionFactory(), "jmsBatchComponent.connectionFactory"); + } + + @Override + public Endpoint getEndpoint() { + return sjmsBatchEndpoint; + } + + private final AtomicBoolean running = new AtomicBoolean(true); + private final AtomicReference<CountDownLatch> consumersShutdownLatchRef = new AtomicReference<>(); + + private Connection connection; + + @Override + protected void doStart() throws Exception { + super.doStart(); + + // start up a shared connection + try { + connection = connectionFactory.createConnection(); + connection.start(); + } catch (JMSException ex) { + LOG.error("Exception caught closing connection: {}", getStackTrace(ex)); + return; + } + + if (LOG.isInfoEnabled()) { + LOG.info("Starting " + consumerCount + " consumer(s) for " + destinationName + ":" + completionSize); + } + consumersShutdownLatchRef.set(new CountDownLatch(consumerCount)); + + jmsConsumerExecutors = getEndpoint().getCamelContext().getExecutorServiceManager() + .newFixedThreadPool(this, "SjmsBatchConsumer", consumerCount); + for (int i = 0; i < consumerCount; i++) { + jmsConsumerExecutors.execute(new BatchConsumptionLoop()); + } + } + + @Override + protected void doStop() throws Exception { + super.doStop(); + running.set(false); + CountDownLatch consumersShutdownLatch = consumersShutdownLatchRef.get(); + if (consumersShutdownLatch != null) { + LOG.info("Stop signalled, waiting on consumers to shut down"); + if (consumersShutdownLatch.await(60, TimeUnit.SECONDS)) { + LOG.warn("Timeout waiting on consumer threads to signal completion - shutting down"); + } else { + LOG.info("All consumers have shut down"); + } + } else { + LOG.info("Stop signalled while there are no consumers yet, so no need to wait for consumers"); + } + + try { + LOG.debug("Shutting down JMS connection"); + connection.close(); + } catch (JMSException jex) { + LOG.error("Exception caught closing connection: {}", getStackTrace(jex)); + } + + getEndpoint().getCamelContext().getExecutorServiceManager() + .shutdown(jmsConsumerExecutors); + } + + private String getStackTrace(Exception ex) { + StringWriter writer = new StringWriter(); + ex.printStackTrace(new PrintWriter(writer)); + return writer.toString(); + } + + private class BatchConsumptionLoop implements Runnable { + @Override + public void run() { + try { + // a batch corresponds to a single session that will be committed or rolled back by a background thread + final Session session = connection.createSession(TRANSACTED, Session.CLIENT_ACKNOWLEDGE); + try { + // destinationName only creates queues; there is no additional value to be gained + // by transactionally consuming topic messages in batches + Queue queue = session.createQueue(destinationName); + MessageConsumer consumer = session.createConsumer(queue); + try { + consumeBatchesOnLoop(session, consumer); + } finally { + try { + consumer.close(); + } catch (JMSException ex2) { + log.error("Exception caught closing consumer: {}", ex2.getMessage()); + + } + } + } finally { + try { + session.close(); + } catch (JMSException ex1) { + log.error("Exception caught closing session: {}", ex1.getMessage()); + } + } + } catch (JMSException ex) { + // from loop + LOG.error("Exception caught consuming from {}: {}", destinationName, getStackTrace(ex)); + } finally { + // indicate that we have shut down + CountDownLatch consumersShutdownLatch = consumersShutdownLatchRef.get(); + consumersShutdownLatch.countDown(); + } + } + + private void consumeBatchesOnLoop(Session session, MessageConsumer consumer) throws JMSException { + final boolean usingTimeout = completionTimeout > 0; + + batchConsumption: + while (running.get()) { + int messageCount = 0; + + // reset the clock counters + long timeElapsed = 0; + long startTime = 0; + Exchange aggregatedExchange = null; + + batch: + while ((completionSize <= 0) || (messageCount < completionSize)) { + // check periodically to see whether we should be shutting down + long waitTime = (usingTimeout && (timeElapsed > 0)) + ? getReceiveWaitTime(timeElapsed) + : pollDuration; + Message message = consumer.receive(waitTime); + + if (running.get()) { // no interruptions received + if (message == null) { + // timed out, no message received + LOG.trace("No message received"); + } else { + if ((usingTimeout) && (messageCount == 0)) { // this is the first message + startTime = new Date().getTime(); // start counting down the period for this batch + } + messageCount++; + LOG.debug("Message received: {}", messageCount); + if ((message instanceof ObjectMessage) + || (message instanceof TextMessage)) { + Exchange exchange = JmsMessageHelper.createExchange(message, getEndpoint()); + aggregatedExchange = aggregationStrategy.aggregate(aggregatedExchange, exchange); + aggregatedExchange.setProperty(SjmsBatchEndpoint.PROPERTY_BATCH_SIZE, messageCount); + } else { + throw new IllegalArgumentException("Unexpected message type: " + + message.getClass().toString()); + } + } + + if ((usingTimeout) && (startTime > 0)) { + // a batch has been started, check whether it should be timed out + long currentTime = new Date().getTime(); + timeElapsed = currentTime - startTime; + + if (timeElapsed > completionTimeout) { + // batch finished by timeout + break batch; + } + } + + } else { + LOG.info("Shutdown signal received - rolling batch back"); + session.rollback(); + break batchConsumption; + } + } // batch + assert (aggregatedExchange != null); + process(aggregatedExchange, session); + } + } + + /** + * Determine the time that a call to {@link MessageConsumer#receive()} should wait given the time that has elapsed for this batch. + * @param timeElapsed The time that has elapsed. + * @return The shorter of the time remaining or poll duration. + */ + private long getReceiveWaitTime(long timeElapsed) { + long timeRemaining = getTimeRemaining(timeElapsed); + + // wait for the shorter of the time remaining or the poll duration + if (timeRemaining <= 0) { // ensure that the thread doesn't wait indefinitely + timeRemaining = 1; + } + final long waitTime = (timeRemaining > pollDuration) ? pollDuration : timeRemaining; + + LOG.debug("waiting for {}", waitTime); + return waitTime; + } + + private long getTimeRemaining(long timeElapsed) { + long timeRemaining = completionTimeout - timeElapsed; + if (LOG.isDebugEnabled() && (timeElapsed > 0)) { + LOG.debug("Time remaining this batch: {}", timeRemaining); + } + return timeRemaining; + } + + private void process(Exchange exchange, Session session) { + assert (exchange != null); + int id = batchCount.getAndIncrement(); + int batchSize = exchange.getProperty(SjmsBatchEndpoint.PROPERTY_BATCH_SIZE, Integer.class); + if (LOG.isDebugEnabled()) { + LOG.debug("Processing batch:size={}:total={}", batchSize, messagesReceived.addAndGet(batchSize)); + } + + Synchronization committing = new SessionCompletion(session); + exchange.addOnCompletion(committing); + try { + processor.process(exchange); + LOG.debug("Completed processing[{}]:total={}", id, messagesProcessed.addAndGet(batchSize)); + } catch (Exception e) { + LOG.error("Error processing exchange: {}", e.getMessage()); + } + } + + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/ab1d1dd7/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java new file mode 100644 index 0000000..afd5cbe --- /dev/null +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.sjms.batch; + +import org.apache.camel.Component; +import org.apache.camel.Consumer; +import org.apache.camel.Processor; +import org.apache.camel.Producer; +import org.apache.camel.impl.DefaultEndpoint; +import org.apache.camel.processor.aggregate.AggregationStrategy; +import org.apache.camel.spi.Metadata; +import org.apache.camel.spi.UriEndpoint; +import org.apache.camel.spi.UriParam; +import org.apache.camel.spi.UriPath; + +/** + * @author jkorab + */ +@UriEndpoint(scheme = "sjmsBatch", title = "Simple JMS Batch Component", syntax = "sjms-batch:destinationName?aggregationStrategy=#aggStrategy", consumerClass = SjmsBatchComponent.class, label = "messaging") +public class SjmsBatchEndpoint extends DefaultEndpoint { + + public static final int DEFAULT_COMPLETION_SIZE = 200; // the default dispatch queue size in ActiveMQ + public static final int DEFAULT_COMPLETION_TIMEOUT = 500; + public static final String PROPERTY_BATCH_SIZE = "CamelSjmsBatchSize"; + + @UriPath + @Metadata(required = "true") + private String destinationName; + + @UriParam(label = "consumer", defaultValue = "1", description = "The number of JMS sessions to consume from") + private Integer consumerCount = 1; + + @UriParam(label = "consumer", defaultValue = "200", + description = "The number of messages consumed at which the batch will be completed") + private Integer completionSize = DEFAULT_COMPLETION_SIZE; + + @UriParam(label = "consumer", defaultValue = "500", + description = "The timeout from receipt of the first first message when the batch will be completed") + private Integer completionTimeout = DEFAULT_COMPLETION_TIMEOUT; + + @UriParam(label = "consumer", defaultValue = "1000", + description = "The duration in milliseconds of each poll for messages. " + + "completionTimeOut will be used if it is shorter and a batch has started.") + private Integer pollDuration = 1000; + + @Metadata(required = "true") + @UriParam(label = "consumer", description = "A #-reference to an AggregationStrategy visible to Camel") + private AggregationStrategy aggregationStrategy; + + public SjmsBatchEndpoint() {} + + public SjmsBatchEndpoint(String endpointUri, Component component, String remaining) { + super(endpointUri, component); + this.destinationName = remaining; + } + + @Override + public boolean isSingleton() { + return true; + } + + @Override + public Producer createProducer() throws Exception { + throw new UnsupportedOperationException("Cannot produce though a " + SjmsBatchEndpoint.class.getName()); + } + + @Override + public Consumer createConsumer(Processor processor) throws Exception { + return new SjmsBatchConsumer(this, processor); + } + + public AggregationStrategy getAggregationStrategy() { + return aggregationStrategy; + } + + public void setAggregationStrategy(AggregationStrategy aggregationStrategy) { + this.aggregationStrategy = aggregationStrategy; + } + + public Integer getCompletionSize() { + return completionSize; + } + + public void setCompletionSize(Integer completionSize) { + this.completionSize = completionSize; + } + + public Integer getCompletionTimeout() { + return completionTimeout; + } + + public void setCompletionTimeout(Integer completionTimeout) { + this.completionTimeout = completionTimeout; + } + + public String getDestinationName() { + return destinationName; + } + + public void setDestinationName(String destinationName) { + this.destinationName = destinationName; + } + + public Integer getConsumerCount() { + return consumerCount; + } + + public void setConsumerCount(Integer consumerCount) { + this.consumerCount = consumerCount; + } + + public Integer getPollDuration() { + return pollDuration; + } + + public void setPollDuration(Integer pollDuration) { + this.pollDuration = pollDuration; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/ab1d1dd7/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/AbstractMessageHandler.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/AbstractMessageHandler.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/AbstractMessageHandler.java index c3c5e55..1598a43 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/AbstractMessageHandler.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/AbstractMessageHandler.java @@ -70,7 +70,8 @@ public abstract class AbstractMessageHandler implements MessageListener { public void onMessage(Message message) { RuntimeCamelException rce = null; try { - final DefaultExchange exchange = (DefaultExchange) JmsMessageHelper.createExchange(message, getEndpoint()); + SjmsEndpoint endpoint = (SjmsEndpoint) getEndpoint(); + final DefaultExchange exchange = (DefaultExchange) JmsMessageHelper.createExchange(message, endpoint, endpoint.getJmsKeyFormatStrategy()); log.debug("Processing Exchange.id:{}", exchange.getExchangeId()); http://git-wip-us.apache.org/repos/asf/camel/blob/ab1d1dd7/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java index dcccd9b..79787c9 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java @@ -62,8 +62,23 @@ public final class JmsMessageHelper implements JmsConstants { } public static Exchange createExchange(Message message, Endpoint endpoint) { + return createExchange(message, endpoint, null); + } + + /** + * Creates an Exchange from a JMS Message. + * @param message The JMS message. + * @param endpoint The Endpoint to use to create the Exchange object. + * @param keyFormatStrategy the a {@link KeyFormatStrategy} to used to + * format keys in a JMS 1.1 compliant manner. If null the + * {@link DefaultJmsKeyFormatStrategy} will be used. + * @return Populated Exchange. + */ + public static Exchange createExchange(Message message, Endpoint endpoint, KeyFormatStrategy keyFormatStrategy) { Exchange exchange = endpoint.createExchange(); - return populateExchange(message, exchange, false, ((SjmsEndpoint)endpoint).getJmsKeyFormatStrategy()); + KeyFormatStrategy initialisedKeyFormatStrategy = (keyFormatStrategy == null) + ? new DefaultJmsKeyFormatStrategy() : keyFormatStrategy; + return populateExchange(message, exchange, false, initialisedKeyFormatStrategy); } @SuppressWarnings("unchecked") @@ -222,11 +237,11 @@ public final class JmsMessageHelper implements JmsConstants { * @param jmsMessage the {@link Message} to add or update the headers on * @param messageHeaders a {@link Map} of String/Object pairs * @param keyFormatStrategy the a {@link KeyFormatStrategy} to used to - * format keys in a JMS 1.1 compliant manner. If null the - * {@link DefaultJmsKeyFormatStrategy} will be used. + * format keys in a JMS 1.1 compliant manner. * @return {@link Message} */ private static Message setJmsMessageHeaders(final Message jmsMessage, Map<String, Object> messageHeaders, KeyFormatStrategy keyFormatStrategy) throws IllegalHeaderException { + Map<String, Object> headers = new HashMap<String, Object>(messageHeaders); for (final Map.Entry<String, Object> entry : headers.entrySet()) { String headerName = entry.getKey(); http://git-wip-us.apache.org/repos/asf/camel/blob/ab1d1dd7/components/camel-sjms/src/main/resources/META-INF/services/org/apache/camel/component/sjms-batch ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/resources/META-INF/services/org/apache/camel/component/sjms-batch b/components/camel-sjms/src/main/resources/META-INF/services/org/apache/camel/component/sjms-batch new file mode 100644 index 0000000..9ee9e4c --- /dev/null +++ b/components/camel-sjms/src/main/resources/META-INF/services/org/apache/camel/component/sjms-batch @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +class=org.apache.camel.component.sjms.batch.SjmsBatchComponent http://git-wip-us.apache.org/repos/asf/camel/blob/ab1d1dd7/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/EmbeddedActiveMQBroker.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/EmbeddedActiveMQBroker.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/EmbeddedActiveMQBroker.java new file mode 100644 index 0000000..fd1ed27 --- /dev/null +++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/EmbeddedActiveMQBroker.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.sjms.batch; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.store.memory.MemoryPersistenceAdapter; +import org.apache.camel.test.AvailablePortFinder; +import org.junit.rules.ExternalResource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * JUnit Test aspect that creates an embedded ActiveMQ broker at the beginning of each test and shuts it down after. + */ +public class EmbeddedActiveMQBroker extends ExternalResource { + + private final Logger log = LoggerFactory.getLogger(EmbeddedActiveMQBroker.class); + private final String brokerId; + private BrokerService brokerService; + private final String tcpConnectorUri; + + public EmbeddedActiveMQBroker(String brokerId) { + if ((brokerId == null) || (brokerId.isEmpty())) { + throw new IllegalArgumentException("brokerId is empty"); + } + this.brokerId = brokerId; + tcpConnectorUri = "tcp://localhost:" + AvailablePortFinder.getNextAvailable(); + + brokerService = new BrokerService(); + brokerService.setBrokerId(brokerId); + brokerService.setPersistent(false); + brokerService.setUseJmx(false); + try { + brokerService.setPersistenceAdapter(new MemoryPersistenceAdapter()); + brokerService.addConnector(tcpConnectorUri); + } catch (Exception e) { + throw new RuntimeException("Problem creating brokerService", e); + } + } + + @Override + protected void before() throws Throwable { + log.info("Starting embedded broker[{}] on {}", brokerId, tcpConnectorUri); + brokerService.start(); + } + + @Override + protected void after() { + try { + log.info("Stopping embedded broker[{}]", brokerId); + brokerService.stop(); + } catch (Exception e) { + throw new RuntimeException("Exception shutting down broker service", e); + } + } + + public String getTcpConnectorUri() { + return tcpConnectorUri; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/ab1d1dd7/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/ListAggregationStrategy.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/ListAggregationStrategy.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/ListAggregationStrategy.java new file mode 100644 index 0000000..39774b2 --- /dev/null +++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/ListAggregationStrategy.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.sjms.batch; + +import org.apache.camel.Exchange; +import org.apache.camel.processor.aggregate.AggregationStrategy; + +import java.util.ArrayList; +import java.util.List; + +/** + * @author jkorab + */ +public class ListAggregationStrategy implements AggregationStrategy { + @Override + public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { + String body = newExchange.getIn().getBody(String.class); + if (oldExchange == null) { + List<String> list = new ArrayList<String>(); + list.add(body); + newExchange.getIn().setBody(list); + return newExchange; + } else { + List<String> list = oldExchange.getIn().getBody(List.class); + list.add(body); + return oldExchange; + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/ab1d1dd7/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java new file mode 100644 index 0000000..58fc717 --- /dev/null +++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java @@ -0,0 +1,247 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.sjms.batch; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.camel.CamelContext; +import org.apache.camel.Exchange; +import org.apache.camel.LoggingLevel; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.component.sjms.SjmsComponent; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.impl.SimpleRegistry; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.apache.camel.util.StopWatch; +import org.junit.Rule; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.ConnectionFactory; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.List; + +/** + * @author jkorab + */ +public class SjmsBatchConsumerTest extends CamelTestSupport { + private final Logger LOG = LoggerFactory.getLogger(SjmsBatchConsumerTest.class); + + @Rule + public EmbeddedActiveMQBroker broker = new EmbeddedActiveMQBroker("localhost"); + + @Override + public CamelContext createCamelContext() throws Exception { + SimpleRegistry registry = new SimpleRegistry(); + registry.put("testStrategy", new ListAggregationStrategy()); + ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(broker.getTcpConnectorUri()); + + SjmsComponent sjmsComponent = new SjmsComponent(); + sjmsComponent.setConnectionFactory(connectionFactory); + + SjmsBatchComponent sjmsBatchComponent = new SjmsBatchComponent(); + sjmsBatchComponent.setConnectionFactory(connectionFactory); + + CamelContext context = new DefaultCamelContext(registry); + context.addComponent("sjms", sjmsComponent); + context.addComponent("sjmsbatch", sjmsBatchComponent); + return context; + } + + private static class TransactedSendHarness extends RouteBuilder { + private final String queueName; + + public TransactedSendHarness(String queueName) { + this.queueName = queueName; + } + + @Override + public void configure() throws Exception { + from("direct:in").routeId("harness").startupOrder(20) + .split(body()) + .toF("sjms:queue:%s?transacted=true", queueName) + .to("mock:before") + .end(); + } + } + + @Override + public boolean isUseAdviceWith() { + return true; + } + + @Test + public void testConsumption() throws Exception { + + final int messageCount = 10000; + final int consumerCount = 5; + + final String queueName = getQueueName(); + context.addRoutes(new TransactedSendHarness(queueName)); + context.addRoutes(new RouteBuilder() { + public void configure() throws Exception { + + int completionTimeout = 1000; + int completionSize = 200; + + fromF("sjmsbatch:%s?completionTimeout=%s&completionSize=%s" + + "&consumerCount=%s&aggregationStrategy=#testStrategy", + queueName, completionTimeout, completionSize, consumerCount) + .routeId("batchConsumer").startupOrder(10).autoStartup(false) + .split(body()) + .to("mock:split"); + } + }); + context.start(); + + MockEndpoint mockBefore = getMockEndpoint("mock:before"); + mockBefore.setExpectedMessageCount(messageCount); + + MockEndpoint mockSplit = getMockEndpoint("mock:split"); + mockSplit.setExpectedMessageCount(messageCount); + + LOG.info("Sending messages"); + template.sendBody("direct:in", generateStrings(messageCount)); + LOG.info("Send complete"); + + StopWatch stopWatch = new StopWatch(); + context.startRoute("batchConsumer"); + assertMockEndpointsSatisfied(); + long time = stopWatch.stop(); + + LOG.info("Processed {} messages in {} ms", messageCount, time); + LOG.info("Average throughput {} msg/s", (long) (messageCount / (time / 1000d))); + } + + @Test + public void testConsumption_completionSize() throws Exception { + final int completionSize = 5; + final int completionTimeout = -1; // size-based only + + final String queueName = getQueueName(); + context.addRoutes(new TransactedSendHarness(queueName)); + context.addRoutes(new RouteBuilder() { + public void configure() throws Exception { + fromF("sjmsbatch:%s?completionTimeout=%s&completionSize=%s&aggregationStrategy=#testStrategy", + queueName, completionTimeout, completionSize).routeId("batchConsumer").startupOrder(10) + .log(LoggingLevel.DEBUG, "${body.size}") + .to("mock:batches"); + } + }); + context.start(); + + int messageCount = 100; + MockEndpoint mockBatches = getMockEndpoint("mock:batches"); + mockBatches.expectedMessageCount(messageCount / completionSize); + + template.sendBody("direct:in", generateStrings(messageCount)); + mockBatches.assertIsSatisfied(); + } + + @Test + public void testConsumption_completionTimeout() throws Exception { + final int completionTimeout = 2000; + final int completionSize = -1; // timeout-based only + + final String queueName = getQueueName(); + context.addRoutes(new TransactedSendHarness(queueName)); + context.addRoutes(new RouteBuilder() { + public void configure() throws Exception { + fromF("sjmsbatch:%s?completionTimeout=%s&completionSize=%s&aggregationStrategy=#testStrategy", + queueName, completionTimeout, completionSize).routeId("batchConsumer").startupOrder(10) + .to("mock:batches"); + } + }); + context.start(); + + int messageCount = 50; + assertTrue(messageCount < SjmsBatchEndpoint.DEFAULT_COMPLETION_SIZE); + MockEndpoint mockBatches = getMockEndpoint("mock:batches"); + mockBatches.expectedMessageCount(1); // everything batched together + + template.sendBody("direct:in", generateStrings(messageCount)); + mockBatches.assertIsSatisfied(); + assertFirstMessageBodyOfLength(mockBatches, messageCount); + } + + /** + * Checks whether multiple consumer endpoints can operate in parallel. + */ + @Test + public void testConsumption_multipleConsumerEndpoints() throws Exception { + final int completionTimeout = 2000; + final int completionSize = 5; + + final String queueName = getQueueName(); + context.addRoutes(new RouteBuilder() { + public void configure() throws Exception { + + from("direct:in") + .split().body() + .multicast() + .toF("sjms:%s", queueName + "A") + .toF("sjms:%s", queueName + "B") + .end(); + + fromF("sjmsbatch:%s?completionTimeout=%s&completionSize=%s&aggregationStrategy=#testStrategy", + queueName + "A", completionTimeout, completionSize).routeId("batchConsumerA") + .to("mock:outA"); + + fromF("sjmsbatch:%s?completionTimeout=%s&completionSize=%s&aggregationStrategy=#testStrategy", + queueName + "B", completionTimeout, completionSize).routeId("batchConsumerB") + .to("mock:outB"); + + } + }); + context.start(); + + int messageCount = 5; + + assertTrue(messageCount < SjmsBatchEndpoint.DEFAULT_COMPLETION_SIZE); + MockEndpoint mockOutA = getMockEndpoint("mock:outA"); + mockOutA.expectedMessageCount(1); // everything batched together + MockEndpoint mockOutB = getMockEndpoint("mock:outB"); + mockOutB.expectedMessageCount(1); // everything batched together + + template.sendBody("direct:in", generateStrings(messageCount)); + assertMockEndpointsSatisfied(); + + assertFirstMessageBodyOfLength(mockOutA, messageCount); + assertFirstMessageBodyOfLength(mockOutB, messageCount); + } + + private void assertFirstMessageBodyOfLength(MockEndpoint mockEndpoint, int expectedLength) { + Exchange exchange = mockEndpoint.getExchanges().get(0); + assertEquals(expectedLength, exchange.getIn().getBody(List.class).size()); + } + + private String getQueueName() { + SimpleDateFormat sdf = new SimpleDateFormat("yyMMddhhmmss"); + return "sjmsbatch-" + sdf.format(new Date()); + } + + private String[] generateStrings(int messageCount) { + String[] strings = new String[messageCount]; + for (int i = 0; i < messageCount; i++) { + strings[i] = "message:" + i; + } + return strings; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/ab1d1dd7/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpointTest.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpointTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpointTest.java new file mode 100644 index 0000000..7a75e76 --- /dev/null +++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpointTest.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.sjms.batch; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.camel.CamelContext; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.sjms.SjmsComponent; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.impl.SimpleRegistry; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.apache.camel.util.toolbox.AggregationStrategies; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * @author jkorab + */ +public class SjmsBatchEndpointTest extends CamelTestSupport { + + // Create one embedded broker instance for the entire test, as we aren't actually + // going to send any messages to it; we just need it so that the ConnectionFactory + // has something local to connect to. + public static EmbeddedActiveMQBroker broker; + + @BeforeClass + public static void setupBroker() { + broker = new EmbeddedActiveMQBroker("localhost"); + try { + broker.before(); + } catch (Throwable t) { + throw new RuntimeException(t); + } + } + + @AfterClass + public static void shutDownBroker() { + broker.after(); + } + + @Override + protected CamelContext createCamelContext() throws Exception { + SimpleRegistry registry = new SimpleRegistry(); + registry.put("aggStrategy", AggregationStrategies.groupedExchange()); + + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); + connectionFactory.setBrokerURL(broker.getTcpConnectorUri()); + + SjmsComponent sjmsComponent = new SjmsComponent(); + sjmsComponent.setConnectionFactory(connectionFactory); + + SjmsBatchComponent sjmsBatchComponent = new SjmsBatchComponent(); + sjmsBatchComponent.setConnectionFactory(connectionFactory); + + CamelContext context = new DefaultCamelContext(registry); + context.addComponent("sjmsbatch", sjmsBatchComponent); + context.addComponent("sjms", sjmsComponent); + + return context; + } + + @Override + public boolean isUseAdviceWith() { + return true; + } + + @Test(expected = org.apache.camel.FailedToCreateProducerException.class) + public void testProducerFailure() throws Exception { + context.addRoutes(new RouteBuilder() { + public void configure() throws Exception { + from("direct:in").to("sjmsbatch:testQueue"); + } + }); + context.start(); + } + + @Test(expected = IllegalArgumentException.class) + public void testConsumer_negativePollDuration() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("sjmsbatch:in?aggregationStrategy=#aggStrategy&pollDuration=-1") + .to("mock:out"); + } + }); + context.start(); + } + + @Test(expected = IllegalArgumentException.class) + public void testConsumer_negativeConsumerCount() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("sjmsbatch:in?aggregationStrategy=#aggStrategy&consumerCount=-1") + .to("mock:out"); + } + }); + context.start(); + } + +}