Fixed NPE in rabbitmq and polished the component.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/423d2e8f Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/423d2e8f Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/423d2e8f Branch: refs/heads/master Commit: 423d2e8f26eb1bd7c69900aaa603b897c55c144a Parents: 82419d2 Author: Claus Ibsen <davscl...@apache.org> Authored: Tue Jul 30 13:36:33 2013 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Tue Jul 30 16:28:00 2013 +0200 ---------------------------------------------------------------------- components/camel-rabbitmq/pom.xml | 130 ++++++++------- .../component/rabbitmq/RabbitMQComponent.java | 23 ++- .../component/rabbitmq/RabbitMQConstants.java | 2 - .../component/rabbitmq/RabbitMQConsumer.java | 14 +- .../component/rabbitmq/RabbitMQEndpoint.java | 165 ++++++++++--------- .../component/rabbitmq/RabbitMQProducer.java | 26 +-- .../rabbitmq/RabbitMQComponentTest.java | 2 +- .../rabbitmq/RabbitMQEndpointTest.java | 25 ++- .../src/test/resources/log4j.properties | 4 +- 9 files changed, 211 insertions(+), 180 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/423d2e8f/components/camel-rabbitmq/pom.xml ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/pom.xml b/components/camel-rabbitmq/pom.xml index 4218b03..d3dd493 100644 --- a/components/camel-rabbitmq/pom.xml +++ b/components/camel-rabbitmq/pom.xml @@ -15,7 +15,8 @@ See the License for the specific language governing permissions and limitations under the License. --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> @@ -29,70 +30,75 @@ <packaging>bundle</packaging> <name>Camel :: RabbitMQ</name> <description>Camel RabbitMQ Component</description> - + <properties> - <camel.osgi.export.pkg> - org.apache.camel.component.rabbitmq.* - </camel.osgi.export.pkg> - <camel.osgi.export.service>org.apache.camel.spi.ComponentResolver;component=rabbitmq</camel.osgi.export.service> + <camel.osgi.export.pkg> + org.apache.camel.component.rabbitmq.* + </camel.osgi.export.pkg> + <camel.osgi.export.service>org.apache.camel.spi.ComponentResolver;component=rabbitmq</camel.osgi.export.service> </properties> - <dependencies> - <dependency> - <groupId>com.rabbitmq</groupId> - <artifactId>amqp-client</artifactId> - <version>${rabbitmq-amqp-client-version}</version> - </dependency> - <dependency> - <groupId>org.apache.camel</groupId> - <artifactId>camel-core</artifactId> - </dependency> - - <!-- testing --> - <dependency> - <groupId>org.apache.camel</groupId> - <artifactId>camel-test</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.mockito</groupId> - <artifactId>mockito-core</artifactId> - <scope>test</scope> - </dependency> - </dependencies> - - <build> - <plugins> - <plugin> - <artifactId>maven-surefire-plugin</artifactId> - <configuration> - <excludes> - <exclude>**/*IntTest*</exclude> - </excludes> - </configuration> - </plugin> - </plugins> - </build> + <dependencies> + <dependency> + <groupId>com.rabbitmq</groupId> + <artifactId>amqp-client</artifactId> + <version>${rabbitmq-amqp-client-version}</version> + </dependency> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-core</artifactId> + </dependency> - <profiles> - <profile> - <id>itest</id> - <build> - <plugins> - <plugin> - <artifactId>maven-surefire-plugin</artifactId> - <configuration> - <excludes> - <exclude>None</exclude> - </excludes> - <includes> - <include>**/*IntTest*</include> - </includes> - </configuration> - </plugin> - </plugins> - </build> - </profile> - </profiles> + <!-- testing --> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-test</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <excludes> + <exclude>**/*IntTest*</exclude> + </excludes> + </configuration> + </plugin> + </plugins> + </build> + + <profiles> + <profile> + <id>itest</id> + <build> + <plugins> + <plugin> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <excludes> + <exclude>None</exclude> + </excludes> + <includes> + <include>**/*IntTest*</include> + </includes> + </configuration> + </plugin> + </plugins> + </build> + </profile> + </profiles> </project> http://git-wip-us.apache.org/repos/asf/camel/blob/423d2e8f/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java index 859a86d..03cf896 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java @@ -14,16 +14,20 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.camel.component.rabbitmq; +import java.net.URI; import java.util.Map; import org.apache.camel.CamelContext; import org.apache.camel.impl.DefaultComponent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class RabbitMQComponent extends DefaultComponent { + private static final transient Logger LOG = LoggerFactory.getLogger(RabbitMQComponent.class); + public RabbitMQComponent() { } @@ -35,8 +39,23 @@ public class RabbitMQComponent extends DefaultComponent { protected RabbitMQEndpoint createEndpoint(String uri, String remaining, Map<String, Object> params) throws Exception { - RabbitMQEndpoint endpoint = new RabbitMQEndpoint(uri, remaining, this); + URI host = new URI("http://" + remaining); + String hostname = host.getHost(); + int portNumber = host.getPort(); + String exchangeName = host.getPath().substring(1); + + RabbitMQEndpoint endpoint = new RabbitMQEndpoint(uri, this); + endpoint.setHostname(hostname); + endpoint.setPortNumber(portNumber); + endpoint.setExchangeName(exchangeName); + setProperties(endpoint, params); + + if (LOG.isDebugEnabled()) { + LOG.debug("Creating RabbitMQEndpoint with host {}:{} and exchangeName: {}", + endpoint.getHostname(), endpoint.getPortNumber(), endpoint.getExchangeName()) + } + return endpoint; } } http://git-wip-us.apache.org/repos/asf/camel/blob/423d2e8f/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConstants.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConstants.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConstants.java index 0fc1642..f57f561 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConstants.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConstants.java @@ -14,10 +14,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.camel.component.rabbitmq; - public final class RabbitMQConstants { public static final String ROUTING_KEY = "rabbitmq.ROUTING_KEY"; http://git-wip-us.apache.org/repos/asf/camel/blob/423d2e8f/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java index 2f78d00..0927f81 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.camel.component.rabbitmq; import java.io.IOException; @@ -25,13 +24,10 @@ import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Envelope; - - import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.impl.DefaultConsumer; - public class RabbitMQConsumer extends DefaultConsumer { ExecutorService executor; @@ -86,7 +82,13 @@ public class RabbitMQConsumer extends DefaultConsumer { channel = null; conn = null; - executor.shutdown(); + if (executor != null) { + if (getEndpoint() != null && getEndpoint().getCamelContext() != null) { + getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(executor); + } else { + executor.shutdownNow(); + } + } executor = null; } @@ -123,7 +125,7 @@ public class RabbitMQConsumer extends DefaultConsumer { channel.basicAck(deliveryTag, false); } catch (Exception e) { - e.printStackTrace(); + getExceptionHandler().handleException("Error processing exchange", exchange, e); } } } http://git-wip-us.apache.org/repos/asf/camel/blob/423d2e8f/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java index a62ad9c..2a7e2d6 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java @@ -14,11 +14,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.camel.component.rabbitmq; import java.io.IOException; -import java.net.URI; import java.net.URISyntaxException; import java.util.UUID; import java.util.concurrent.ExecutorService; @@ -27,7 +25,6 @@ import java.util.concurrent.Executors; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Envelope; - import org.apache.camel.Consumer; import org.apache.camel.Exchange; import org.apache.camel.Message; @@ -55,70 +52,10 @@ public class RabbitMQEndpoint extends DefaultEndpoint { public RabbitMQEndpoint() { } - public RabbitMQEndpoint(String endpointUri, - String remaining, - RabbitMQComponent component) throws URISyntaxException { + public RabbitMQEndpoint(String endpointUri, RabbitMQComponent component) throws URISyntaxException { super(endpointUri, component); - - URI uri = new URI("http://" + remaining); - hostname = uri.getHost(); - portNumber = uri.getPort(); - exchangeName = uri.getPath().substring(1); - } - - - public String getExchangeName() { - return exchangeName; - } - - public void setQueue(String queue) { - this.queue = queue; - } - - public int getThreadPoolSize() { - return threadPoolSize; - } - - public void setThreadPoolSize(int threadPoolSize) { - this.threadPoolSize = threadPoolSize; - } - - public boolean isAutoAck() { - return autoAck; - } - - public void setAutoAck(boolean autoAck) { - this.autoAck = autoAck; - } - - public String getQueue() { - return queue; - } - - public boolean isAutoDelete() { - return autoDelete; } - public void setAutoDelete(boolean autoDelete) { - this.autoDelete = autoDelete; - } - - public boolean isDurable() { - return durable; - } - - public void setDurable(boolean durable) { - this.durable = durable; - } - - public String getRoutingKey() { - return routingKey; - } - - public void setRoutingKey(String routingKey) { - this.routingKey = routingKey; - } - public Exchange createRabbitExchange(Envelope envelope) { Exchange exchange = new DefaultExchange(getCamelContext(), getExchangePattern()); @@ -163,20 +100,12 @@ public class RabbitMQEndpoint extends DefaultEndpoint { return true; } - public int getPortNumber() { - return portNumber; - } - - public String getHostname() { - return hostname; - } - - public String getVhost() { - return vhost; - } - - public String getPassword() { - return password; + protected ExecutorService createExecutor() { + if (getCamelContext() != null) { + return getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, "RabbitMQConsumer", getThreadPoolSize()); + } else { + return Executors.newFixedThreadPool(getThreadPoolSize()); + } } public String getUsername() { @@ -187,15 +116,91 @@ public class RabbitMQEndpoint extends DefaultEndpoint { this.username = username; } + public String getPassword() { + return password; + } + public void setPassword(String password) { this.password = password; } + public String getVhost() { + return vhost; + } + public void setVhost(String vhost) { this.vhost = vhost; } - public ExecutorService createExecutor() { - return Executors.newFixedThreadPool(getThreadPoolSize()); + public String getHostname() { + return hostname; + } + + public void setHostname(String hostname) { + this.hostname = hostname; + } + + public int getThreadPoolSize() { + return threadPoolSize; + } + + public void setThreadPoolSize(int threadPoolSize) { + this.threadPoolSize = threadPoolSize; + } + + public int getPortNumber() { + return portNumber; + } + + public void setPortNumber(int portNumber) { + this.portNumber = portNumber; + } + + public boolean isAutoAck() { + return autoAck; + } + + public void setAutoAck(boolean autoAck) { + this.autoAck = autoAck; + } + + public boolean isAutoDelete() { + return autoDelete; + } + + public void setAutoDelete(boolean autoDelete) { + this.autoDelete = autoDelete; + } + + public boolean isDurable() { + return durable; + } + + public void setDurable(boolean durable) { + this.durable = durable; + } + + public String getQueue() { + return queue; + } + + public void setQueue(String queue) { + this.queue = queue; + } + + public String getExchangeName() { + return exchangeName; + } + + public void setExchangeName(String exchangeName) { + this.exchangeName = exchangeName; + } + + public String getRoutingKey() { + return routingKey; + } + + public void setRoutingKey(String routingKey) { + this.routingKey = routingKey; } } http://git-wip-us.apache.org/repos/asf/camel/blob/423d2e8f/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java index 3cf7233..1336de9 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.camel.component.rabbitmq; import java.io.IOException; @@ -24,10 +23,9 @@ import java.util.concurrent.Executors; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; - import org.apache.camel.Exchange; import org.apache.camel.impl.DefaultProducer; - +import org.apache.camel.util.ObjectHelper; public class RabbitMQProducer extends DefaultProducer { @@ -40,22 +38,30 @@ public class RabbitMQProducer extends DefaultProducer { this.channel = conn.createChannel(); } + @Override + public RabbitMQEndpoint getEndpoint() { + return (RabbitMQEndpoint) super.getEndpoint(); + } + public void shutdown() throws IOException { conn.close(); } @Override public void process(Exchange exchange) throws Exception { + String exchangeName = exchange.getIn().getHeader(RabbitMQConstants.EXCHANGE_NAME, String.class); + if (exchangeName == null) { + exchangeName = getEndpoint().getExchangeName(); + } + if (ObjectHelper.isEmpty(exchangeName)) { + throw new IllegalArgumentException("ExchangeName is not provided in header " + RabbitMQConstants.EXCHANGE_NAME); + } - Object key = exchange.getIn().getHeader(RabbitMQConstants.ROUTING_KEY); - String exchangeName = exchange.getIn().getHeader(RabbitMQConstants.EXCHANGE_NAME).toString(); - byte[] messageBodyBytes = exchange.getIn().getBody(byte[].class); + String key = exchange.getIn().getHeader(RabbitMQConstants.ROUTING_KEY, "", String.class); + byte[] messageBodyBytes = exchange.getIn().getMandatoryBody(byte[].class); AMQP.BasicProperties.Builder properties = buildProperties(exchange); - channel.basicPublish(exchangeName, - key == null ? "" : key.toString(), - properties.build(), - messageBodyBytes); + channel.basicPublish(exchangeName, key, properties.build(), messageBodyBytes); } AMQP.BasicProperties.Builder buildProperties(Exchange exchange) { http://git-wip-us.apache.org/repos/asf/camel/blob/423d2e8f/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQComponentTest.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQComponentTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQComponentTest.java index 7bf707f..244bc29 100644 --- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQComponentTest.java +++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQComponentTest.java @@ -50,7 +50,7 @@ public class RabbitMQComponentTest { assertEquals("queuey", endpoint.getQueue()); assertEquals("vman", endpoint.getVhost()); assertEquals("special.host", endpoint.getHostname()); - assertEquals(14, endpoint.getPortNumber()); + assertEquals(14123, endpoint.getPortNumber()); assertEquals(515, endpoint.getThreadPoolSize()); assertEquals(true, endpoint.isAutoAck()); } http://git-wip-us.apache.org/repos/asf/camel/blob/423d2e8f/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java index 7db87ea..f66ab85 100644 --- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java +++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java @@ -16,25 +16,22 @@ */ package org.apache.camel.component.rabbitmq; -import java.net.URISyntaxException; import java.util.UUID; import java.util.concurrent.ThreadPoolExecutor; import com.rabbitmq.client.Envelope; - import org.apache.camel.Exchange; -import org.apache.camel.test.junit4.TestSupport; +import org.apache.camel.test.junit4.CamelTestSupport; import org.junit.Test; import org.mockito.Mockito; -public class RabbitMQEndpointTest extends TestSupport { +public class RabbitMQEndpointTest extends CamelTestSupport { private Envelope envelope = Mockito.mock(Envelope.class); @Test - public void testCreatingRabbitExchangeSetsHeaders() throws URISyntaxException { - RabbitMQEndpoint endpoint = - new RabbitMQEndpoint("rabbitmq:localhost/exchange", "localhost/exchange", new RabbitMQComponent()); + public void testCreatingRabbitExchangeSetsHeaders() throws Exception { + RabbitMQEndpoint endpoint = context.getEndpoint("rabbitmq:localhost/exchange", RabbitMQEndpoint.class); String routingKey = UUID.randomUUID().toString(); String exchangeName = UUID.randomUUID().toString(); @@ -52,19 +49,17 @@ public class RabbitMQEndpointTest extends TestSupport { @Test public void creatingExecutorUsesThreadPoolSettings() throws Exception { - - RabbitMQEndpoint endpoint = - new RabbitMQEndpoint("rabbitmq:localhost/exchange", "localhost/exchange", new RabbitMQComponent()); - endpoint.setThreadPoolSize(400); + RabbitMQEndpoint endpoint = context.getEndpoint("rabbitmq:localhost/exchange?threadPoolSize=20", RabbitMQEndpoint.class); + assertEquals(20, endpoint.getThreadPoolSize()); ThreadPoolExecutor executor = assertIsInstanceOf(ThreadPoolExecutor.class, endpoint.createExecutor()); - assertEquals(400, executor.getCorePoolSize()); + assertEquals(20, executor.getCorePoolSize()); } @Test - public void assertSingleton() throws URISyntaxException { - RabbitMQEndpoint endpoint = - new RabbitMQEndpoint("rabbitmq:localhost/exchange", "localhost/exchange", new RabbitMQComponent()); + public void assertSingleton() throws Exception { + RabbitMQEndpoint endpoint = context.getEndpoint("rabbitmq:localhost/exchange", RabbitMQEndpoint.class); + assertTrue(endpoint.isSingleton()); } } http://git-wip-us.apache.org/repos/asf/camel/blob/423d2e8f/components/camel-rabbitmq/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/test/resources/log4j.properties b/components/camel-rabbitmq/src/test/resources/log4j.properties index 9e72bd9..6e75a1b 100644 --- a/components/camel-rabbitmq/src/test/resources/log4j.properties +++ b/components/camel-rabbitmq/src/test/resources/log4j.properties @@ -16,13 +16,13 @@ ## ------------------------------------------------------------------------ # -# The logging properties used for eclipse testing, We want to see debug output on the console. +# The logging properties used for testing. # log4j.rootLogger=INFO, file # uncomment the following to enable camel debugging #log4j.logger.org.apache.camel=DEBUG -#log4j.logger.org.apache.camel.component.quickfixj=DEBUG +#log4j.logger.org.apache.camel.component.rabbitmq=DEBUG # CONSOLE appender not used by default log4j.appender.out=org.apache.log4j.ConsoleAppender