Author: sully6768 Date: Wed Sep 26 18:51:53 2012 New Revision: 1390665 URL: http://svn.apache.org/viewvc?rev=1390665&view=rev Log: Create ConnectionResource integration test and clean up current ITests.
Added: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/ConnectionResourceIT.java Modified: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/DefaultConsumer.java camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/AsyncJmsInOutIT.java camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/AsyncJmsInOutTempDestIT.java camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/SyncJmsInOutIT.java camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/SyncJmsInOutTempDestIT.java camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/JmsTestSupport.java camel/trunk/components/camel-sjms/src/test/resources/log4j.properties Modified: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/DefaultConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/DefaultConsumer.java?rev=1390665&r1=1390664&r2=1390665&view=diff ============================================================================== --- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/DefaultConsumer.java (original) +++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/DefaultConsumer.java Wed Sep 26 18:51:53 2012 @@ -63,9 +63,9 @@ public class DefaultConsumer extends Sjm protected void destroyObject(MessageConsumerResources model) throws Exception { if (model != null) { if (model.getMessageConsumer() != null) { - if (model.getMessageConsumer().getMessageListener() != null) { - model.getMessageConsumer().setMessageListener(null); - } +// if (model.getMessageConsumer().getMessageListener() != null) { +// model.getMessageConsumer().setMessageListener(null); +// } model.getMessageConsumer().close(); } Modified: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java?rev=1390665&r1=1390664&r2=1390665&view=diff ============================================================================== --- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java (original) +++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java Wed Sep 26 18:51:53 2012 @@ -98,8 +98,10 @@ public class InOutProducer extends SjmsP @Override public void onMessage(Message message) { - logger.info("Message Received in the Consumer Pool"); - logger.info(" Message : {}", message); + if (logger.isDebugEnabled()) { + logger.debug("Message Received in the Consumer Pool"); + logger.debug(" Message : {}", message); + } try { Exchanger<Object> exchanger = exchangerMap.get(message.getJMSCorrelationID()); exchanger.exchange(message, getResponseTimeOut(), TimeUnit.MILLISECONDS); Modified: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/AsyncJmsInOutIT.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/AsyncJmsInOutIT.java?rev=1390665&r1=1390664&r2=1390665&view=diff ============================================================================== --- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/AsyncJmsInOutIT.java (original) +++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/AsyncJmsInOutIT.java Wed Sep 26 18:51:53 2012 @@ -26,7 +26,9 @@ import org.apache.camel.util.StopWatch; import org.junit.Test; /** - * @version + * Integration test that verifies the ability of SJMS to correctly process + * asynchronous InOut exchanges from both the Producer and Consumer perspective + * using a namedReplyTo destination. */ public class AsyncJmsInOutIT extends JmsTestSupport { @@ -53,23 +55,13 @@ public class AsyncJmsInOutIT extends Jms return new RouteBuilder() { @Override public void configure() throws Exception { - // in a fully sync mode it would take at least 5 + 5 sec to process the 100 messages - // (there are delays in both routes) - // however due async routing, we can leverage the fact to let threads non blocked - // in the first route, and therefore can have the messages processed faster - // because we can have messages wait concurrently in both routes - // this means the async processing model is about 2x faster from("seda:start") - // we can only send at fastest the 100 msg in 5 sec due the delay - .delay(50) - .to("sjms:queue:bar?synchronous=false&exchangePattern=InOut") + .to("sjms:queue:in.foo?synchronous=false&namedReplyTo=out.bar&exchangePattern=InOut") .to("mock:result"); - from("sjms:queue:bar?synchronous=false&exchangePattern=InOut") + from("sjms:queue:in.foo?synchronous=false&exchangePattern=InOut") .log("Using ${threadName} to process ${body}") - // we can only process at fastest the 100 msg in 5 sec due the delay - .delay(50) .transform(body().prepend("Bye ")); } }; Modified: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/AsyncJmsInOutTempDestIT.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/AsyncJmsInOutTempDestIT.java?rev=1390665&r1=1390664&r2=1390665&view=diff ============================================================================== --- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/AsyncJmsInOutTempDestIT.java (original) +++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/AsyncJmsInOutTempDestIT.java Wed Sep 26 18:51:53 2012 @@ -26,7 +26,9 @@ import org.apache.camel.util.StopWatch; import org.junit.Test; /** - * @version + * Integration test that verifies the ability of SJMS to correctly process + * asynchronous InOut exchanges from both the Producer and Consumer perspective + * using a temporary destination. */ public class AsyncJmsInOutTempDestIT extends JmsTestSupport { @@ -53,23 +55,13 @@ public class AsyncJmsInOutTempDestIT ext return new RouteBuilder() { @Override public void configure() throws Exception { - // in a fully sync mode it would take at least 5 + 5 sec to process the 100 messages - // (there are delays in both routes) - // however due async routing, we can leverage the fact to let threads non blocked - // in the first route, and therefore can have the messages processed faster - // because we can have messages wait concurrently in both routes - // this means the async processing model is about 2x faster from("seda:start") - // we can only send at fastest the 100 msg in 5 sec due the delay - .delay(50) - .to("sjms:in.out.temp.queue?exchangePattern=InOut&synchronous=false") + .to("sjms:in.foo.tempQ?synchronous=false&exchangePattern=InOut") .to("mock:result"); - from("sjms:in.out.temp.queue?exchangePattern=InOut&synchronous=false") + from("sjms:in.foo.tempQ?synchronous=false&exchangePattern=InOut") .log("Using ${threadName} to process ${body}") - // we can only process at fastest the 100 msg in 5 sec due the delay - .delay(50) .transform(body().prepend("Bye ")); } }; Added: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/ConnectionResourceIT.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/ConnectionResourceIT.java?rev=1390665&view=auto ============================================================================== --- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/ConnectionResourceIT.java (added) +++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/ConnectionResourceIT.java Wed Sep 26 18:51:53 2012 @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.sjms.it; + +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.Session; + +import org.apache.activemq.pool.PooledConnectionFactory; +import org.apache.camel.CamelContext; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.component.sjms.SjmsComponent; +import org.apache.camel.component.sjms.jms.ConnectionResource; +import org.apache.camel.component.sjms.support.JmsTestSupport; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.util.StopWatch; +import org.junit.Test; + +/** + * Integration test that verifies we can replace the internal + * ConnectionFactoryResource with another provider. + * + */ +public class ConnectionResourceIT extends JmsTestSupport { + + /** + * Test method for + * {@link org.apache.camel.component.sjms.jms.ObjectPool#returnObject(java.lang.Object)} + * . + * + * @throws Exception + */ + @Test + public void testCreateConnections() throws Exception { + ConnectionResource pool = new AMQConnectionResource("tcp://localhost:33333", 1); + assertNotNull(pool); + Connection connection = pool.borrowConnection(); + assertNotNull(connection); + assertNotNull(connection.createSession(false, Session.AUTO_ACKNOWLEDGE)); + pool.returnConnection(connection); + Connection connection2 = pool.borrowConnection(); + assertNotNull(connection2); + } + + @Test + public void testAsynchronous() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(100); + mock.expectsNoDuplicates(body()); + + StopWatch watch = new StopWatch(); + + for (int i = 0; i < 100; i++) { + template.sendBody("seda:start", "" + i); + } + + // just in case we run on slow boxes + assertMockEndpointsSatisfied(20, TimeUnit.SECONDS); + + log.info("Took " + watch.stop() + " ms. to process 100 messages request/reply over JMS"); + } + + /* + * @see org.apache.camel.test.junit4.CamelTestSupport#createCamelContext() + * @return + * @throws Exception + */ + @Override + protected CamelContext createCamelContext() throws Exception { + CamelContext camelContext = new DefaultCamelContext(); + AMQConnectionResource pool = new AMQConnectionResource("tcp://localhost:33333", 1); + SjmsComponent component = new SjmsComponent(); + component.setConnectionResource(pool); + camelContext.addComponent("sjms", component); + return camelContext; + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + + from("seda:start") + .to("sjms:queue:in.foo?namedReplyTo=out.bar&exchangePattern=InOut&producerCount=5") + .to("mock:result"); + + from("sjms:queue:in.foo?exchangePattern=InOut&consumerCount=20") + .log("Using ${threadName} to process ${body}") + .transform(body().prepend("Bye ")); + } + }; + } + + public class AMQConnectionResource implements ConnectionResource { + private PooledConnectionFactory pcf; + + public AMQConnectionResource(String connectString, int maxConnections) { + super(); + pcf = new PooledConnectionFactory(connectString); + pcf.setMaxConnections(maxConnections); + pcf.start(); + } + + public void stop() { + pcf.stop(); + } + + @Override + public Connection borrowConnection() throws Exception { + Connection answer = pcf.createConnection(); + answer.start(); + return answer; + } + + @Override + public Connection borrowConnection(long timeout) throws Exception { + Connection answer = null; + int counter = 0; + while (counter++ < timeout) { + answer = pcf.createConnection(); + if (answer == null) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + log.info("Intrupted but ignoring: ", e.getMessage()); + } + } else { + break; + } + } + if (answer != null) { + answer.start(); + } + return answer; + } + + @Override + public void returnConnection(Connection connection) throws Exception { + // Do nothing in this case since the PooledConnectionFactory takes + // care of this for us + log.info("Connection returned"); + } + } +} Modified: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/SyncJmsInOutIT.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/SyncJmsInOutIT.java?rev=1390665&r1=1390664&r2=1390665&view=diff ============================================================================== --- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/SyncJmsInOutIT.java (original) +++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/SyncJmsInOutIT.java Wed Sep 26 18:51:53 2012 @@ -25,8 +25,11 @@ import org.apache.camel.util.StopWatch; import org.junit.Test; + /** - * @version + * Integration test that verifies the ability of SJMS to correctly process + * synchronous InOut exchanges from both the Producer and Consumer perspective + * using a namedReplyTo destination. */ public class SyncJmsInOutIT extends JmsTestSupport { @@ -53,23 +56,13 @@ public class SyncJmsInOutIT extends JmsT return new RouteBuilder() { @Override public void configure() throws Exception { - // in a fully sync mode it would take at least 5 + 5 sec to process the 100 messages - // (there are delays in both routes) - // however due async routing, we can leverage the fact to let threads non blocked - // in the first route, and therefore can have the messages processed faster - // because we can have messages wait concurrently in both routes - // this means the async processing model is about 2x faster from("seda:start") - // we can only send at fastest the 100 msg in 5 sec due the delay - .delay(50) - .to("sjms:queue:bar?exchangePattern=InOut") + .to("sjms:queue:in.foo?namedReplyTo=out.bar&exchangePattern=InOut") .to("mock:result"); - from("sjms:queue:bar?exchangePattern=InOut") + from("sjms:queue:in.foo?exchangePattern=InOut") .log("Using ${threadName} to process ${body}") - // we can only process at fastest the 100 msg in 5 sec due the delay - .delay(50) .transform(body().prepend("Bye ")); } }; Modified: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/SyncJmsInOutTempDestIT.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/SyncJmsInOutTempDestIT.java?rev=1390665&r1=1390664&r2=1390665&view=diff ============================================================================== --- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/SyncJmsInOutTempDestIT.java (original) +++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/SyncJmsInOutTempDestIT.java Wed Sep 26 18:51:53 2012 @@ -25,8 +25,11 @@ import org.apache.camel.util.StopWatch; import org.junit.Test; + /** - * @version + * Integration test that verifies the ability of SJMS to correctly process + * synchronous InOut exchanges from both the Producer and Consumer perspective + * using a temporary destination. */ public class SyncJmsInOutTempDestIT extends JmsTestSupport { @@ -53,23 +56,12 @@ public class SyncJmsInOutTempDestIT exte return new RouteBuilder() { @Override public void configure() throws Exception { - // in a fully sync mode it would take at least 5 + 5 sec to process the 100 messages - // (there are delays in both routes) - // however due async routing, we can leverage the fact to let threads non blocked - // in the first route, and therefore can have the messages processed faster - // because we can have messages wait concurrently in both routes - // this means the async processing model is about 2x faster - from("seda:start") - // we can only send at fastest the 100 msg in 5 sec due the delay - .delay(50) - .to("sjms:in.out.temp.queue?exchangePattern=InOut") + .to("sjms:in.foo.tempQ?exchangePattern=InOut") .to("mock:result"); - from("sjms:in.out.temp.queue?exchangePattern=InOut") + from("sjms:in.foo.tempQ?exchangePattern=InOut") .log("Using ${threadName} to process ${body}") - // we can only process at fastest the 100 msg in 5 sec due the delay - .delay(50) .transform(body().prepend("Bye ")); } }; Modified: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/JmsTestSupport.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/JmsTestSupport.java?rev=1390665&r1=1390664&r2=1390665&view=diff ============================================================================== --- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/JmsTestSupport.java (original) +++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/JmsTestSupport.java Wed Sep 26 18:51:53 2012 @@ -29,8 +29,8 @@ import org.apache.camel.impl.DefaultCame import org.apache.camel.test.junit4.CamelTestSupport; /** - * TODO Add Class documentation for JmsTestSupport - * + * A support class that builds up and tears down an ActiveMQ instance to be used + * for unit testing. */ public class JmsTestSupport extends CamelTestSupport { private static final String BROKER_URI = "tcp://localhost:33333"; @@ -40,20 +40,21 @@ public class JmsTestSupport extends Came private Connection connection; private Session session; + /** + * Set up the Broker + * + * @see org.apache.camel.test.junit4.CamelTestSupport#doPreSetup() + * + * @throws Exception + */ @Override protected void doPreSetup() throws Exception { - super.doPreSetup(); - } - - @Override - public void setUp() throws Exception { broker = new BrokerService(); broker.setUseJmx(true); broker.setPersistent(false); broker.deleteAllMessages(); broker.addConnector(BROKER_URI); broker.start(); - super.setUp(); } @Override @@ -79,10 +80,9 @@ public class JmsTestSupport extends Came broker = null; } } - + /* * @see org.apache.camel.test.junit4.CamelTestSupport#createCamelContext() - * * @return * @throws Exception */ @@ -99,7 +99,7 @@ public class JmsTestSupport extends Came camelContext.addComponent("sjms", component); return camelContext; } - + public void setSession(Session session) { this.session = session; } Modified: camel/trunk/components/camel-sjms/src/test/resources/log4j.properties URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/resources/log4j.properties?rev=1390665&r1=1390664&r2=1390665&view=diff ============================================================================== --- camel/trunk/components/camel-sjms/src/test/resources/log4j.properties (original) +++ camel/trunk/components/camel-sjms/src/test/resources/log4j.properties Wed Sep 26 18:51:53 2012 @@ -20,8 +20,8 @@ log4j.rootLogger=INFO, out # uncomment the following line to turn on Camel debugging -log4j.logger.org.apache.activemq=info -log4j.logger.org.apache.activemq.transaction=trace +log4j.logger.org.apache.activemq=warn +log4j.logger.org.apache.activemq.transaction=warn log4j.logger.org.apache.camel=info log4j.logger.org.apache.camel.converter=info log4j.logger.org.apache.camel.component.sjms=info