added documentation and test case
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/94496488 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/94496488 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/94496488 Branch: refs/heads/master Commit: 944964888ed512501ed7495f51dc7468a3059c46 Parents: bd6b87c Author: Bryan Love <bryan.l...@iovation.com> Authored: Thu Mar 23 14:04:36 2017 -0700 Committer: Claus Ibsen <davscl...@apache.org> Committed: Tue Mar 28 10:03:54 2017 +0200 ---------------------------------------------------------------------- .../src/main/docs/sjms-batch-component.adoc | 3 +- .../component/sjms/batch/SjmsBatchConsumer.java | 6 +-- .../component/sjms/batch/SjmsBatchEndpoint.java | 12 ++++- .../sjms/batch/SjmsBatchConsumerTest.java | 49 +++++++++++++++++++- .../component/sjms/support/MockConnection.java | 43 +++++++++++++++++ .../sjms/support/MockConnectionFactory.java | 42 +++++++++++++++++ .../sjms/support/MockMessageConsumer.java | 29 ++++++++++++ .../component/sjms/support/MockSession.java | 45 ++++++++++++++++++ 8 files changed, 222 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/94496488/components/camel-sjms/src/main/docs/sjms-batch-component.adoc ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/docs/sjms-batch-component.adoc b/components/camel-sjms/src/main/docs/sjms-batch-component.adoc index cf8f2b2..3ed1d86 100644 --- a/components/camel-sjms/src/main/docs/sjms-batch-component.adoc +++ b/components/camel-sjms/src/main/docs/sjms-batch-component.adoc @@ -148,7 +148,7 @@ with the following path and query parameters: | **destinationName** | *Required* The destination name. Only queues are supported names may be prefixed by 'queue:'. | | String |======================================================================= -#### Query Parameters (22 parameters): +#### Query Parameters (23 parameters): [width="100%",cols="2,5,^1,2",options="header"] |======================================================================= @@ -171,6 +171,7 @@ with the following path and query parameters: | **asyncStartListener** (advanced) | Whether to startup the consumer message listener asynchronously when starting a route. For example if a JmsConsumer cannot get a connection to a remote JMS broker then it may block while retrying and/or failover. This will cause Camel to block while starting routes. By setting this option to true you will let routes startup while the JmsConsumer connects to the JMS broker using a dedicated thread in asynchronous mode. If this option is used then beware that if the connection could not be established then an exception is logged at WARN level and the consumer will not be able to receive messages; You can then restart the route to retry. | false | boolean | **headerFilterStrategy** (advanced) | To use a custom HeaderFilterStrategy to filter header to and from Camel message. | | HeaderFilterStrategy | **jmsKeyFormatStrategy** (advanced) | Pluggable strategy for encoding and decoding JMS keys so they can be compliant with the JMS specification. Camel provides two implementations out of the box: default and passthrough. The default strategy will safely marshal dots and hyphens (. and -). The passthrough strategy leaves the key as is. Can be used for JMS brokers which do not care whether JMS header keys contain illegal characters. You can provide your own implementation of the org.apache.camel.component.jms.JmsKeyFormatStrategy and refer to it using the notation. | | JmsKeyFormatStrategy +| **keepAliveDelay** (advanced) | The delay in millis between attempts to re-establish a valid session. If this is a positive value the SjmsBatchConsumer will attempt to create a new session if it sees an IllegalStateException during message consumption. This delay value allows you to pause between attempts to prevent spamming the logs. If this is a negative value (default is -1) then the SjmsBatchConsumer will behave as it always has before - that is it will bail out and the route will shut down if it sees an IllegalStateException. | -1 | int | **messageCreatedStrategy** (advanced) | To use the given MessageCreatedStrategy which are invoked when Camel creates new instances of javax.jms.Message objects when Camel is sending a JMS message. | | MessageCreatedStrategy | **recoveryInterval** (advanced) | Specifies the interval between recovery attempts i.e. when a connection is being refreshed in milliseconds. The default is 5000 ms that is 5 seconds. | 5000 | int | **synchronous** (advanced) | Sets whether synchronous processing should be strictly used or Camel is allowed to use asynchronous processing (if supported). | false | boolean http://git-wip-us.apache.org/repos/asf/camel/blob/94496488/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java index c386c66..a32cc3d 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java @@ -322,12 +322,12 @@ public class SjmsBatchConsumer extends DefaultConsumer { } } catch (javax.jms.IllegalStateException ex) { // from consumeBatchesOnLoop - // if keepAliveDelay was not specified just rethrow to break the loop. This preserves original default behavior - if(keepAliveDelay == -1) throw ex; + // if keepAliveDelay was not specified (defaults to -1) just rethrow to break the loop. This preserves original default behavior + if(keepAliveDelay < 0) throw ex; // this will log the exception and the parent loop will create a new session getExceptionHandler().handleException("Exception caught consuming from " + destinationName, ex); //sleep to avoid log spamming - Thread.sleep(keepAliveDelay); + if(keepAliveDelay > 0) Thread.sleep(keepAliveDelay); } finally { closeJmsSession(session); } http://git-wip-us.apache.org/repos/asf/camel/blob/94496488/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java index 2e8affb..395c23f 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java @@ -399,8 +399,18 @@ public class SjmsBatchEndpoint extends DefaultEndpoint implements HeaderFilterSt return recoveryInterval; } + /** + * The delay in millis between attempts to re-establish a valid session. + * If this is a positive value the SjmsBatchConsumer will attempt to create a new session if it sees an IllegalStateException + * during message consumption. This delay value allows you to pause between attempts to prevent spamming the logs. + * If this is a negative value (default is -1) then the SjmsBatchConsumer will behave as it always has before - that is + * it will bail out and the route will shut down if it sees an IllegalStateException. + */ + public void setKeepAliveDelay(int keepAliveDelay) { + this.keepAliveDelay = keepAliveDelay; + } public int getKeepAliveDelay() { - return recoveryInterval; + return keepAliveDelay; } /** http://git-wip-us.apache.org/repos/asf/camel/blob/94496488/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java index e378457..72610de 100644 --- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java +++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java @@ -21,7 +21,6 @@ import java.util.Date; import java.util.List; import javax.jms.ConnectionFactory; -import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.camel.CamelContext; import org.apache.camel.Exchange; import org.apache.camel.LoggingLevel; @@ -29,6 +28,7 @@ import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.component.sjms.SjmsComponent; +import org.apache.camel.component.sjms.support.MockConnectionFactory; import org.apache.camel.impl.DefaultCamelContext; import org.apache.camel.impl.SimpleRegistry; import org.apache.camel.test.junit4.CamelTestSupport; @@ -48,7 +48,7 @@ public class SjmsBatchConsumerTest extends CamelTestSupport { public CamelContext createCamelContext() throws Exception { SimpleRegistry registry = new SimpleRegistry(); registry.put("testStrategy", new ListAggregationStrategy()); - ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(broker.getTcpConnectorUri()); + ConnectionFactory connectionFactory = new MockConnectionFactory(broker.getTcpConnectorUri()); SjmsComponent sjmsComponent = new SjmsComponent(); sjmsComponent.setConnectionFactory(connectionFactory); @@ -338,6 +338,51 @@ public class SjmsBatchConsumerTest extends CamelTestSupport { } + @Test + public void testConsumptionBadSession() throws Exception { + + final int messageCount = 5; + final int consumerCount = 1; + SjmsBatchComponent sb = (SjmsBatchComponent)context.getComponent("sjms-batch"); + MockConnectionFactory cf = (MockConnectionFactory)sb.getConnectionFactory(); + cf.returnBadSessionNTimes(2); + + final String queueName = getQueueName(); + context.addRoutes(new TransactedSendHarness(queueName)); + context.addRoutes(new RouteBuilder() { + public void configure() throws Exception { + + int completionTimeout = 1000; + int completionSize = 200; + + // keepAliveDelay=300 is the key... it's a 300 millis delay between attempts to create a new session. + fromF("sjms-batch:%s?completionTimeout=%s&completionSize=%s&consumerCount=%s&aggregationStrategy=#testStrategy&keepAliveDelay=300", + queueName, completionTimeout, completionSize, consumerCount) + .routeId("batchConsumer").startupOrder(10).autoStartup(false) + .split(body()) + .to("mock:split"); + } + }); + context.start(); + + MockEndpoint mockBefore = getMockEndpoint("mock:before"); + mockBefore.setExpectedMessageCount(messageCount); + + MockEndpoint mockSplit = getMockEndpoint("mock:split"); + mockSplit.setExpectedMessageCount(messageCount); + + LOG.info("Sending messages"); + template.sendBody("direct:in", generateStrings(messageCount)); + LOG.info("Send complete"); + + StopWatch stopWatch = new StopWatch(); + context.startRoute("batchConsumer"); + + assertMockEndpointsSatisfied(); + long time = stopWatch.stop(); + + } + private void assertFirstMessageBodyOfLength(MockEndpoint mockEndpoint, int expectedLength) { Exchange exchange = mockEndpoint.getExchanges().get(0); assertEquals(expectedLength, exchange.getIn().getBody(List.class).size()); http://git-wip-us.apache.org/repos/asf/camel/blob/94496488/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockConnection.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockConnection.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockConnection.java new file mode 100644 index 0000000..00f06be --- /dev/null +++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockConnection.java @@ -0,0 +1,43 @@ +package org.apache.camel.component.sjms.support; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.management.JMSStatsImpl; +import org.apache.activemq.transport.Transport; +import org.apache.activemq.util.IdGenerator; + +import javax.jms.JMSException; +import javax.jms.Session; + +/** + * Created by bryan.love on 3/22/17. + */ +public class MockConnection extends ActiveMQConnection { + private int returnBadSessionNTimes = 0; + + protected MockConnection(final Transport transport, IdGenerator clientIdGenerator, IdGenerator connectionIdGenerator, JMSStatsImpl factoryStats, int returnBadSessionNTimes) throws Exception { + super(transport, clientIdGenerator, connectionIdGenerator, factoryStats); + this.returnBadSessionNTimes = returnBadSessionNTimes; + } + + public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException { + this.checkClosedOrFailed(); + this.ensureConnectionInfoSent(); + if(!transacted) { + if(acknowledgeMode == 0) { + throw new JMSException("acknowledgeMode SESSION_TRANSACTED cannot be used for an non-transacted Session"); + } + + if(acknowledgeMode < 0 || acknowledgeMode > 4) { + throw new JMSException("invalid acknowledgeMode: " + acknowledgeMode + ". Valid values are Session.AUTO_ACKNOWLEDGE (1), Session.CLIENT_ACKNOWLEDGE (2), Session.DUPS_OK_ACKNOWLEDGE (3), ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE (4) or for transacted sessions Session.SESSION_TRANSACTED (0)"); + } + } + + boolean useBadSession = false; + if(returnBadSessionNTimes > 0){ + useBadSession = true; + returnBadSessionNTimes = returnBadSessionNTimes - 1; + } + return new MockSession(this, this.getNextSessionId(), transacted?0:acknowledgeMode, this.isDispatchAsync(), this.isAlwaysSessionAsync(), useBadSession); + + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/94496488/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockConnectionFactory.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockConnectionFactory.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockConnectionFactory.java new file mode 100644 index 0000000..75cbe0f --- /dev/null +++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockConnectionFactory.java @@ -0,0 +1,42 @@ +package org.apache.camel.component.sjms.support; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.management.JMSStatsImpl; +import org.apache.activemq.transport.Transport; + +import javax.jms.Connection; +import javax.jms.JMSException; +import java.net.URI; +import java.net.URISyntaxException; + +/** + * Created by bryan.love on 3/22/17. + */ +public class MockConnectionFactory extends ActiveMQConnectionFactory { + private int returnBadSessionNTimes = 0; + + public Connection createConnection() throws JMSException { + return this.createActiveMQConnection(); + } + public MockConnectionFactory(String brokerURL) { + super(createURI(brokerURL)); + } + private static URI createURI(String brokerURL) { + try { + return new URI(brokerURL); + } catch (URISyntaxException var2) { + throw (IllegalArgumentException)(new IllegalArgumentException("Invalid broker URI: " + brokerURL)).initCause(var2); + } + } + + protected ActiveMQConnection createActiveMQConnection(Transport transport, JMSStatsImpl stats) throws Exception { + MockConnection connection = new MockConnection(transport, this.getClientIdGenerator(), this.getConnectionIdGenerator(), stats, returnBadSessionNTimes); + return connection; + } + + public void returnBadSessionNTimes(int returnBadSessionNTimes) { + this.returnBadSessionNTimes = returnBadSessionNTimes; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/94496488/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockMessageConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockMessageConsumer.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockMessageConsumer.java new file mode 100644 index 0000000..624c152 --- /dev/null +++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockMessageConsumer.java @@ -0,0 +1,29 @@ +package org.apache.camel.component.sjms.support; + +import org.apache.activemq.ActiveMQMessageConsumer; +import org.apache.activemq.ActiveMQSession; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ConsumerId; +import org.apache.activemq.command.MessageDispatch; + +import javax.jms.IllegalStateException; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; + +/** + * Created by bryan.love on 3/22/17. + */ +public class MockMessageConsumer extends ActiveMQMessageConsumer{ + private boolean isBadSession; + + public MockMessageConsumer(ActiveMQSession session, ConsumerId consumerId, ActiveMQDestination dest, String name, String selector, int prefetch, int maximumPendingMessageCount, boolean noLocal, boolean browser, boolean dispatchAsync, MessageListener messageListener, boolean isBadSession) throws JMSException { + super(session, consumerId, dest, name, selector, prefetch, maximumPendingMessageCount, noLocal, browser, dispatchAsync, messageListener); + this.isBadSession = isBadSession; + } + + public Message receive(long timeout) throws JMSException { + if(isBadSession) throw new IllegalStateException("asdf"); + return super.receive(timeout); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/94496488/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockSession.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockSession.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockSession.java new file mode 100644 index 0000000..4290e34 --- /dev/null +++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockSession.java @@ -0,0 +1,45 @@ +package org.apache.camel.component.sjms.support; + +import org.apache.activemq.*; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTempQueue; +import org.apache.activemq.command.SessionId; + +import javax.jms.*; + +/** + * Created by bryan.love on 3/22/17. + */ +public class MockSession extends ActiveMQSession { + private boolean isBadSession = false; + + protected MockSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch, boolean sessionAsyncDispatch, boolean isBadSession) throws JMSException { + super(connection, sessionId, acknowledgeMode, asyncDispatch, sessionAsyncDispatch); + this.isBadSession = isBadSession; + } + public Queue createQueue(String queueName) throws JMSException { + this.checkClosed(); + return (Queue)(queueName.startsWith("ID:")?new ActiveMQTempQueue(queueName):new ActiveMQQueue(queueName)); + } + + public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal, MessageListener messageListener) throws JMSException { + this.checkClosed(); + if(destination instanceof CustomDestination) { + CustomDestination prefetchPolicy1 = (CustomDestination)destination; + return prefetchPolicy1.createConsumer(this, messageSelector, noLocal); + } else { + ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy(); + boolean prefetch = false; + int prefetch1; + if(destination instanceof Topic) { + prefetch1 = prefetchPolicy.getTopicPrefetch(); + } else { + prefetch1 = prefetchPolicy.getQueuePrefetch(); + } + + ActiveMQDestination activemqDestination = ActiveMQMessageTransformation.transformDestination(destination); + return new MockMessageConsumer(this, this.getNextConsumerId(), activemqDestination, (String)null, messageSelector, prefetch1, prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, this.isAsyncDispatch(), messageListener, isBadSession); + } + } +}