CAMEL-7421: Fixed CS
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/00e93fc3 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/00e93fc3 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/00e93fc3 Branch: refs/heads/master Commit: 00e93fc3b5664b3b830a163819c285a0150532f5 Parents: 3ad5018 Author: Claus Ibsen <davscl...@apache.org> Authored: Thu Dec 4 07:34:40 2014 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Thu Dec 4 07:34:40 2014 +0100 ---------------------------------------------------------------------- .../component/rabbitmq/RabbitMQProducer.java | 53 ++++++++++++-------- .../rabbitmq/pool/PoolableChannelFactory.java | 17 ++++--- .../rabbitmq/RabbitMQSpringIntTest.java | 33 ++++++------ 3 files changed, 59 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/00e93fc3/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java index 755fa93..54562d5 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java @@ -16,20 +16,20 @@ */ package org.apache.camel.component.rabbitmq; -import com.rabbitmq.client.AMQP; -import com.rabbitmq.client.Channel; -import com.rabbitmq.client.Connection; -import org.apache.camel.Exchange; -import org.apache.camel.impl.DefaultProducer; -import org.apache.camel.util.ObjectHelper; - import java.io.IOException; import java.math.BigDecimal; import java.util.Date; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutorService; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import org.apache.camel.Exchange; import org.apache.camel.component.rabbitmq.pool.PoolableChannelFactory; +import org.apache.camel.impl.DefaultProducer; +import org.apache.camel.util.ObjectHelper; import org.apache.commons.pool.ObjectPool; import org.apache.commons.pool.impl.GenericObjectPool; @@ -42,11 +42,12 @@ public class RabbitMQProducer extends DefaultProducer { */ private int channelPoolMaxSize = 10; /** - * Maximum time (in milliseconds) waiting for channel - */ + * Maximum time (in milliseconds) waiting for channel + */ private long channelPoolMaxWait = 1000; private ObjectPool<Channel> channelPool; private ExecutorService executorService; + public RabbitMQProducer(RabbitMQEndpoint endpoint) throws IOException { super(endpoint); } @@ -55,12 +56,14 @@ public class RabbitMQProducer extends DefaultProducer { public RabbitMQEndpoint getEndpoint() { return (RabbitMQEndpoint) super.getEndpoint(); } + /** * Channel callback (similar to Spring JDBC ConnectionCallback) */ private static interface ChannelCallback<T> { - public T doWithChannel(Channel channel) throws Exception; + T doWithChannel(Channel channel) throws Exception; } + /** * Do something with a pooled channel (similar to Spring JDBC TransactionTemplate#execute) */ @@ -72,6 +75,7 @@ public class RabbitMQProducer extends DefaultProducer { channelPool.returnObject(channel); } } + /** * Open connection and initialize channel pool */ @@ -81,7 +85,7 @@ public class RabbitMQProducer extends DefaultProducer { log.debug("Created connection: {}", conn); log.trace("Creating channel pool..."); - channelPool = new GenericObjectPool<>(new PoolableChannelFactory(this.conn), getChannelPoolMaxSize(), GenericObjectPool.WHEN_EXHAUSTED_BLOCK, getChannelPoolMaxWait()); + channelPool = new GenericObjectPool<Channel>(new PoolableChannelFactory(this.conn), getChannelPoolMaxSize(), GenericObjectPool.WHEN_EXHAUSTED_BLOCK, getChannelPoolMaxWait()); if (getEndpoint().isDeclare()) { execute(new ChannelCallback<Void>() { @Override @@ -147,14 +151,15 @@ public class RabbitMQProducer extends DefaultProducer { } /** - * Send a message borrowing a channel from the pool - * @param exchange Target exchange + * Send a message borrowing a channel from the pool. + * + * @param exchange Target exchange * @param routingKey Routing key * @param properties Header properties - * @param body Body content + * @param body Body content */ private void basicPublish(final String exchange, final String routingKey, final AMQP.BasicProperties properties, final byte[] body) throws Exception { - if (channelPool==null) { + if (channelPool == null) { // Open connection and channel lazily openConnectionAndChannelPool(); } @@ -174,7 +179,7 @@ public class RabbitMQProducer extends DefaultProducer { if (contentType != null) { properties.contentType(contentType.toString()); } - + final Object priority = exchange.getIn().getHeader(RabbitMQConstants.PRIORITY); if (priority != null) { properties.priority(Integer.parseInt(priority.toString())); @@ -263,8 +268,8 @@ public class RabbitMQProducer extends DefaultProducer { /** * Strategy to test if the given header is valid * - * @param headerValue the header value - * @return the value to use, <tt>null</tt> to ignore this header + * @param headerValue the header value + * @return the value to use, <tt>null</tt> to ignore this header * @see com.rabbitmq.client.impl.Frame#fieldValueSize */ private Object getValidRabbitMQHeaderValue(Object headerValue) { @@ -294,33 +299,37 @@ public class RabbitMQProducer extends DefaultProducer { /** * Get maximum number of opened channel in pool + * * @return Maximum number of opened channel in pool */ public int getChannelPoolMaxSize() { - return channelPoolMaxSize; + return channelPoolMaxSize; } /** * Set maximum number of opened channel in pool + * * @param channelPoolMaxSize Maximum number of opened channel in pool */ public void setChannelPoolMaxSize(int channelPoolMaxSize) { - this.channelPoolMaxSize = channelPoolMaxSize; + this.channelPoolMaxSize = channelPoolMaxSize; } /** * Get the maximum number of milliseconds to wait for a channel from the pool + * * @return Maximum number of milliseconds waiting for a channel */ public long getChannelPoolMaxWait() { - return channelPoolMaxWait; + return channelPoolMaxWait; } /** * Set the maximum number of milliseconds to wait for a channel from the pool + * * @param channelPoolMaxWait Maximum number of milliseconds waiting for a channel */ public void setChannelPoolMaxWait(long channelPoolMaxWait) { - this.channelPoolMaxWait = channelPoolMaxWait; + this.channelPoolMaxWait = channelPoolMaxWait; } } http://git-wip-us.apache.org/repos/asf/camel/blob/00e93fc3/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/pool/PoolableChannelFactory.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/pool/PoolableChannelFactory.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/pool/PoolableChannelFactory.java index b9bed13..b10201f 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/pool/PoolableChannelFactory.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/pool/PoolableChannelFactory.java @@ -1,9 +1,10 @@ -/* - * Copyright 2014 The Apache Software Foundation. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * @@ -13,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.camel.component.rabbitmq.pool; import com.rabbitmq.client.Channel; @@ -21,9 +21,10 @@ import com.rabbitmq.client.Connection; import org.apache.commons.pool.PoolableObjectFactory; /** - * Channel lifecyle manager: create, check and close channel + * Channel lifecycle manager: create, check and close channel */ public class PoolableChannelFactory implements PoolableObjectFactory<Channel> { + /** * Parent connection */ http://git-wip-us.apache.org/repos/asf/camel/blob/00e93fc3/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest.java index f65b909..6a3a3a5 100644 --- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest.java +++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest.java @@ -16,39 +16,48 @@ */ package org.apache.camel.component.rabbitmq; -import com.rabbitmq.client.*; +import java.io.IOException; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.DefaultConsumer; +import com.rabbitmq.client.Envelope; import org.apache.camel.Produce; import org.apache.camel.ProducerTemplate; import org.junit.After; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; -import java.io.IOException; -import java.util.HashMap; - import static org.junit.Assert.assertEquals; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + /** * Test RabbitMQ component with Spring DSL */ @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration public class RabbitMQSpringIntTest { + private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQSpringIntTest.class); + @Produce(uri = "direct:rabbitMQ") protected ProducerTemplate template; @Autowired private ConnectionFactory connectionFactory; private Connection connection; private Channel channel; + private boolean isConnectionOpened() { - return connection!=null && connection.isOpen(); + return connection != null && connection.isOpen(); } + private Connection openConnection() throws IOException { if (!isConnectionOpened()) { LOGGER.info("Open connection"); @@ -56,9 +65,11 @@ public class RabbitMQSpringIntTest { } return connection; } + private boolean isChannelOpened() { return channel != null && channel.isOpen(); } + private Channel openChannel() throws IOException { if (!isChannelOpened()) { LOGGER.info("Open channel"); @@ -70,12 +81,6 @@ public class RabbitMQSpringIntTest { @Before public void bindQueueExchange() throws IOException { openChannel(); - /* - LOGGER.info("Declare exchange queue"); - channel.exchangeDeclare("ex2", "direct", true, false, new HashMap<String, Object>()); - channel.queueDeclare("q2", true, false, false, null); - channel.queueBind("q2", "ex2", "rk2"); - */ } @After @@ -118,7 +123,7 @@ public class RabbitMQSpringIntTest { } @Test - public void testSendCsutomConnectionFactory() throws Exception { + public void testSendCustomConnectionFactory() throws Exception { String body = "Hello Rabbit"; template.sendBodyAndHeader(body, RabbitMQConstants.ROUTING_KEY, "rk2");