added documentation and test case

Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/94496488
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/94496488
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/94496488

Branch: refs/heads/master
Commit: 944964888ed512501ed7495f51dc7468a3059c46
Parents: bd6b87c
Author: Bryan Love <bryan.l...@iovation.com>
Authored: Thu Mar 23 14:04:36 2017 -0700
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Tue Mar 28 10:03:54 2017 +0200

----------------------------------------------------------------------
 .../src/main/docs/sjms-batch-component.adoc     |  3 +-
 .../component/sjms/batch/SjmsBatchConsumer.java |  6 +--
 .../component/sjms/batch/SjmsBatchEndpoint.java | 12 ++++-
 .../sjms/batch/SjmsBatchConsumerTest.java       | 49 +++++++++++++++++++-
 .../component/sjms/support/MockConnection.java  | 43 +++++++++++++++++
 .../sjms/support/MockConnectionFactory.java     | 42 +++++++++++++++++
 .../sjms/support/MockMessageConsumer.java       | 29 ++++++++++++
 .../component/sjms/support/MockSession.java     | 45 ++++++++++++++++++
 8 files changed, 222 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/94496488/components/camel-sjms/src/main/docs/sjms-batch-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/docs/sjms-batch-component.adoc 
b/components/camel-sjms/src/main/docs/sjms-batch-component.adoc
index cf8f2b2..3ed1d86 100644
--- a/components/camel-sjms/src/main/docs/sjms-batch-component.adoc
+++ b/components/camel-sjms/src/main/docs/sjms-batch-component.adoc
@@ -148,7 +148,7 @@ with the following path and query parameters:
 | **destinationName** | *Required* The destination name. Only queues are 
supported names may be prefixed by 'queue:'. |  | String
 |=======================================================================
 
-#### Query Parameters (22 parameters):
+#### Query Parameters (23 parameters):
 
 [width="100%",cols="2,5,^1,2",options="header"]
 |=======================================================================
@@ -171,6 +171,7 @@ with the following path and query parameters:
 | **asyncStartListener** (advanced) | Whether to startup the consumer message 
listener asynchronously when starting a route. For example if a JmsConsumer 
cannot get a connection to a remote JMS broker then it may block while retrying 
and/or failover. This will cause Camel to block while starting routes. By 
setting this option to true you will let routes startup while the JmsConsumer 
connects to the JMS broker using a dedicated thread in asynchronous mode. If 
this option is used then beware that if the connection could not be established 
then an exception is logged at WARN level and the consumer will not be able to 
receive messages; You can then restart the route to retry. | false | boolean
 | **headerFilterStrategy** (advanced) | To use a custom HeaderFilterStrategy 
to filter header to and from Camel message. |  | HeaderFilterStrategy
 | **jmsKeyFormatStrategy** (advanced) | Pluggable strategy for encoding and 
decoding JMS keys so they can be compliant with the JMS specification. Camel 
provides two implementations out of the box: default and passthrough. The 
default strategy will safely marshal dots and hyphens (. and -). The 
passthrough strategy leaves the key as is. Can be used for JMS brokers which do 
not care whether JMS header keys contain illegal characters. You can provide 
your own implementation of the 
org.apache.camel.component.jms.JmsKeyFormatStrategy and refer to it using the 
notation. |  | JmsKeyFormatStrategy
+| **keepAliveDelay** (advanced) | The delay in millis between attempts to 
re-establish a valid session. If this is a positive value the SjmsBatchConsumer 
will attempt to create a new session if it sees an IllegalStateException during 
message consumption. This delay value allows you to pause between attempts to 
prevent spamming the logs. If this is a negative value (default is -1) then the 
SjmsBatchConsumer will behave as it always has before - that is it will bail 
out and the route will shut down if it sees an IllegalStateException. | -1 | int
 | **messageCreatedStrategy** (advanced) | To use the given 
MessageCreatedStrategy which are invoked when Camel creates new instances of 
javax.jms.Message objects when Camel is sending a JMS message. |  | 
MessageCreatedStrategy
 | **recoveryInterval** (advanced) | Specifies the interval between recovery 
attempts i.e. when a connection is being refreshed in milliseconds. The default 
is 5000 ms that is 5 seconds. | 5000 | int
 | **synchronous** (advanced) | Sets whether synchronous processing should be 
strictly used or Camel is allowed to use asynchronous processing (if 
supported). | false | boolean

http://git-wip-us.apache.org/repos/asf/camel/blob/94496488/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
index c386c66..a32cc3d 100644
--- 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
+++ 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
@@ -322,12 +322,12 @@ public class SjmsBatchConsumer extends DefaultConsumer {
                         }
                     } catch (javax.jms.IllegalStateException ex) {
                         // from consumeBatchesOnLoop
-                        // if keepAliveDelay was not specified just rethrow to 
break the loop. This preserves original default behavior
-                        if(keepAliveDelay == -1) throw ex;
+                        // if keepAliveDelay was not specified (defaults to 
-1) just rethrow to break the loop. This preserves original default behavior
+                        if(keepAliveDelay < 0) throw ex;
                         // this will log the exception and the parent loop 
will create a new session
                         getExceptionHandler().handleException("Exception 
caught consuming from " + destinationName, ex);
                         //sleep to avoid log spamming
-                        Thread.sleep(keepAliveDelay);
+                        if(keepAliveDelay > 0) Thread.sleep(keepAliveDelay);
                     } finally {
                         closeJmsSession(session);
                     }

http://git-wip-us.apache.org/repos/asf/camel/blob/94496488/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
index 2e8affb..395c23f 100644
--- 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
+++ 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
@@ -399,8 +399,18 @@ public class SjmsBatchEndpoint extends DefaultEndpoint 
implements HeaderFilterSt
         return recoveryInterval;
     }
 
+    /**
+     * The delay in millis between attempts to re-establish a valid session.
+     * If this is a positive value the SjmsBatchConsumer will attempt to 
create a new session if it sees an IllegalStateException
+     * during message consumption. This delay value allows you to pause 
between attempts to prevent spamming the logs.
+     * If this is a negative value (default is -1) then the SjmsBatchConsumer 
will behave as it always has before - that is
+     * it will bail out and the route will shut down if it sees an 
IllegalStateException.
+     */
+    public void setKeepAliveDelay(int keepAliveDelay) {
+         this.keepAliveDelay = keepAliveDelay;
+    }
     public int getKeepAliveDelay() {
-        return recoveryInterval;
+        return keepAliveDelay;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/camel/blob/94496488/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
 
b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
index e378457..72610de 100644
--- 
a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
+++ 
b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
@@ -21,7 +21,6 @@ import java.util.Date;
 import java.util.List;
 import javax.jms.ConnectionFactory;
 
-import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.LoggingLevel;
@@ -29,6 +28,7 @@ import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.component.sjms.SjmsComponent;
+import org.apache.camel.component.sjms.support.MockConnectionFactory;
 import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.impl.SimpleRegistry;
 import org.apache.camel.test.junit4.CamelTestSupport;
@@ -48,7 +48,7 @@ public class SjmsBatchConsumerTest extends CamelTestSupport {
     public CamelContext createCamelContext() throws Exception {
         SimpleRegistry registry = new SimpleRegistry();
         registry.put("testStrategy", new ListAggregationStrategy());
-        ConnectionFactory connectionFactory = new 
ActiveMQConnectionFactory(broker.getTcpConnectorUri());
+        ConnectionFactory connectionFactory = new 
MockConnectionFactory(broker.getTcpConnectorUri());
 
         SjmsComponent sjmsComponent = new SjmsComponent();
         sjmsComponent.setConnectionFactory(connectionFactory);
@@ -338,6 +338,51 @@ public class SjmsBatchConsumerTest extends 
CamelTestSupport {
 
     }
 
+    @Test
+    public void testConsumptionBadSession() throws Exception {
+
+        final int messageCount = 5;
+        final int consumerCount = 1;
+        SjmsBatchComponent sb = 
(SjmsBatchComponent)context.getComponent("sjms-batch");
+        MockConnectionFactory cf = 
(MockConnectionFactory)sb.getConnectionFactory();
+        cf.returnBadSessionNTimes(2);
+
+        final String queueName = getQueueName();
+        context.addRoutes(new TransactedSendHarness(queueName));
+        context.addRoutes(new RouteBuilder() {
+            public void configure() throws Exception {
+
+                int completionTimeout = 1000;
+                int completionSize = 200;
+
+                // keepAliveDelay=300 is the key... it's a 300 millis delay 
between attempts to create a new session.
+                
fromF("sjms-batch:%s?completionTimeout=%s&completionSize=%s&consumerCount=%s&aggregationStrategy=#testStrategy&keepAliveDelay=300",
+                        queueName, completionTimeout, completionSize, 
consumerCount)
+                        
.routeId("batchConsumer").startupOrder(10).autoStartup(false)
+                        .split(body())
+                        .to("mock:split");
+            }
+        });
+        context.start();
+
+        MockEndpoint mockBefore = getMockEndpoint("mock:before");
+        mockBefore.setExpectedMessageCount(messageCount);
+
+        MockEndpoint mockSplit = getMockEndpoint("mock:split");
+        mockSplit.setExpectedMessageCount(messageCount);
+
+        LOG.info("Sending messages");
+        template.sendBody("direct:in", generateStrings(messageCount));
+        LOG.info("Send complete");
+
+        StopWatch stopWatch = new StopWatch();
+        context.startRoute("batchConsumer");
+
+        assertMockEndpointsSatisfied();
+        long time = stopWatch.stop();
+
+    }
+
     private void assertFirstMessageBodyOfLength(MockEndpoint mockEndpoint, int 
expectedLength) {
         Exchange exchange = mockEndpoint.getExchanges().get(0);
         assertEquals(expectedLength, 
exchange.getIn().getBody(List.class).size());

http://git-wip-us.apache.org/repos/asf/camel/blob/94496488/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockConnection.java
----------------------------------------------------------------------
diff --git 
a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockConnection.java
 
b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockConnection.java
new file mode 100644
index 0000000..00f06be
--- /dev/null
+++ 
b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockConnection.java
@@ -0,0 +1,43 @@
+package org.apache.camel.component.sjms.support;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.management.JMSStatsImpl;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.util.IdGenerator;
+
+import javax.jms.JMSException;
+import javax.jms.Session;
+
+/**
+ * Created by bryan.love on 3/22/17.
+ */
+public class MockConnection extends ActiveMQConnection {
+    private int returnBadSessionNTimes = 0;
+
+    protected MockConnection(final Transport transport, IdGenerator 
clientIdGenerator, IdGenerator connectionIdGenerator, JMSStatsImpl 
factoryStats, int returnBadSessionNTimes) throws Exception {
+        super(transport,  clientIdGenerator,  connectionIdGenerator,  
factoryStats);
+        this.returnBadSessionNTimes = returnBadSessionNTimes;
+    }
+
+    public Session createSession(boolean transacted, int acknowledgeMode) 
throws JMSException {
+        this.checkClosedOrFailed();
+        this.ensureConnectionInfoSent();
+        if(!transacted) {
+            if(acknowledgeMode == 0) {
+                throw new JMSException("acknowledgeMode SESSION_TRANSACTED 
cannot be used for an non-transacted Session");
+            }
+
+            if(acknowledgeMode < 0 || acknowledgeMode > 4) {
+                throw new JMSException("invalid acknowledgeMode: " + 
acknowledgeMode + ". Valid values are Session.AUTO_ACKNOWLEDGE (1), 
Session.CLIENT_ACKNOWLEDGE (2), Session.DUPS_OK_ACKNOWLEDGE (3), 
ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE (4) or for transacted sessions 
Session.SESSION_TRANSACTED (0)");
+            }
+        }
+
+        boolean useBadSession = false;
+        if(returnBadSessionNTimes > 0){
+            useBadSession = true;
+            returnBadSessionNTimes = returnBadSessionNTimes - 1;
+        }
+        return new MockSession(this, this.getNextSessionId(), 
transacted?0:acknowledgeMode, this.isDispatchAsync(), 
this.isAlwaysSessionAsync(), useBadSession);
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/94496488/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockConnectionFactory.java
----------------------------------------------------------------------
diff --git 
a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockConnectionFactory.java
 
b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockConnectionFactory.java
new file mode 100644
index 0000000..75cbe0f
--- /dev/null
+++ 
b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockConnectionFactory.java
@@ -0,0 +1,42 @@
+package org.apache.camel.component.sjms.support;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.management.JMSStatsImpl;
+import org.apache.activemq.transport.Transport;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+/**
+ * Created by bryan.love on 3/22/17.
+ */
+public class MockConnectionFactory extends ActiveMQConnectionFactory {
+    private int returnBadSessionNTimes = 0;
+
+    public Connection createConnection() throws JMSException {
+        return this.createActiveMQConnection();
+    }
+    public MockConnectionFactory(String brokerURL) {
+        super(createURI(brokerURL));
+    }
+    private static URI createURI(String brokerURL) {
+        try {
+            return new URI(brokerURL);
+        } catch (URISyntaxException var2) {
+            throw (IllegalArgumentException)(new 
IllegalArgumentException("Invalid broker URI: " + brokerURL)).initCause(var2);
+        }
+    }
+
+    protected ActiveMQConnection createActiveMQConnection(Transport transport, 
JMSStatsImpl stats) throws Exception {
+        MockConnection connection = new MockConnection(transport, 
this.getClientIdGenerator(), this.getConnectionIdGenerator(), stats, 
returnBadSessionNTimes);
+        return connection;
+    }
+
+    public void returnBadSessionNTimes(int returnBadSessionNTimes) {
+        this.returnBadSessionNTimes = returnBadSessionNTimes;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/94496488/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockMessageConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockMessageConsumer.java
 
b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockMessageConsumer.java
new file mode 100644
index 0000000..624c152
--- /dev/null
+++ 
b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockMessageConsumer.java
@@ -0,0 +1,29 @@
+package org.apache.camel.component.sjms.support;
+
+import org.apache.activemq.ActiveMQMessageConsumer;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.MessageDispatch;
+
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+
+/**
+ * Created by bryan.love on 3/22/17.
+ */
+public class MockMessageConsumer extends ActiveMQMessageConsumer{
+    private boolean isBadSession;
+
+    public MockMessageConsumer(ActiveMQSession session, ConsumerId consumerId, 
ActiveMQDestination dest, String name, String selector, int prefetch, int 
maximumPendingMessageCount, boolean noLocal, boolean browser, boolean 
dispatchAsync, MessageListener messageListener, boolean isBadSession) throws 
JMSException {
+        super(session, consumerId, dest, name, selector, prefetch, 
maximumPendingMessageCount, noLocal, browser, dispatchAsync, messageListener);
+        this.isBadSession = isBadSession;
+    }
+
+    public Message receive(long timeout) throws JMSException {
+        if(isBadSession) throw new IllegalStateException("asdf");
+        return super.receive(timeout);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/94496488/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockSession.java
----------------------------------------------------------------------
diff --git 
a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockSession.java
 
b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockSession.java
new file mode 100644
index 0000000..4290e34
--- /dev/null
+++ 
b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockSession.java
@@ -0,0 +1,45 @@
+package org.apache.camel.component.sjms.support;
+
+import org.apache.activemq.*;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTempQueue;
+import org.apache.activemq.command.SessionId;
+
+import javax.jms.*;
+
+/**
+ * Created by bryan.love on 3/22/17.
+ */
+public class MockSession extends ActiveMQSession {
+    private boolean isBadSession = false;
+
+    protected MockSession(ActiveMQConnection connection, SessionId sessionId, 
int acknowledgeMode, boolean asyncDispatch, boolean sessionAsyncDispatch, 
boolean isBadSession) throws JMSException {
+        super(connection,  sessionId,  acknowledgeMode,  asyncDispatch,  
sessionAsyncDispatch);
+        this.isBadSession = isBadSession;
+    }
+    public Queue createQueue(String queueName) throws JMSException {
+        this.checkClosed();
+        return (Queue)(queueName.startsWith("ID:")?new 
ActiveMQTempQueue(queueName):new ActiveMQQueue(queueName));
+    }
+
+    public MessageConsumer createConsumer(Destination destination, String 
messageSelector, boolean noLocal, MessageListener messageListener) throws 
JMSException {
+        this.checkClosed();
+        if(destination instanceof CustomDestination) {
+            CustomDestination prefetchPolicy1 = (CustomDestination)destination;
+            return prefetchPolicy1.createConsumer(this, messageSelector, 
noLocal);
+        } else {
+            ActiveMQPrefetchPolicy prefetchPolicy = 
this.connection.getPrefetchPolicy();
+            boolean prefetch = false;
+            int prefetch1;
+            if(destination instanceof Topic) {
+                prefetch1 = prefetchPolicy.getTopicPrefetch();
+            } else {
+                prefetch1 = prefetchPolicy.getQueuePrefetch();
+            }
+
+            ActiveMQDestination activemqDestination = 
ActiveMQMessageTransformation.transformDestination(destination);
+            return new MockMessageConsumer(this, this.getNextConsumerId(), 
activemqDestination, (String)null, messageSelector, prefetch1, 
prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, 
this.isAsyncDispatch(), messageListener, isBadSession);
+        }
+    }
+}

Reply via email to