Updated Branches: refs/heads/master 4eed66cdf -> bf1f5f0cc
CAMEL-6534 Added camel-rabbitmq component with thanks to stephen Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/0e2498b1 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/0e2498b1 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/0e2498b1 Branch: refs/heads/master Commit: 0e2498b12ceb74916d60bbeebbb86a5912315a10 Parents: 4eed66c Author: Willem Jiang <ningji...@apache.org> Authored: Mon Jul 15 18:43:15 2013 +0800 Committer: Willem Jiang <ningji...@apache.org> Committed: Mon Jul 15 18:59:58 2013 +0800 ---------------------------------------------------------------------- components/camel-rabbitmq/pom.xml | 122 ++++++++++++++ .../component/rabbitmq/RabbitMQComponent.java | 28 ++++ .../component/rabbitmq/RabbitMQConstants.java | 23 +++ .../component/rabbitmq/RabbitMQConsumer.java | 110 ++++++++++++ .../component/rabbitmq/RabbitMQEndpoint.java | 167 +++++++++++++++++++ .../component/rabbitmq/RabbitMQProducer.java | 104 ++++++++++++ .../org/apache/camel/component/rabbitmq | 1 + .../rabbitmq/RabbitMQComponentTest.java | 44 +++++ .../rabbitmq/RabbitMQConsumerIntTest.java | 64 +++++++ .../rabbitmq/RabbitMQConsumerTest.java | 56 +++++++ .../rabbitmq/RabbitMQEndpointTest.java | 58 +++++++ .../rabbitmq/RabbitMQProducerIntTest.java | 79 +++++++++ .../rabbitmq/RabbitMQProducerTest.java | 138 +++++++++++++++ .../camel-rabbitmq/src/test/resources/log4j.xml | 20 +++ components/pom.xml | 1 + 15 files changed, 1015 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/0e2498b1/components/camel-rabbitmq/pom.xml ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/pom.xml b/components/camel-rabbitmq/pom.xml new file mode 100644 index 0000000..338df36 --- /dev/null +++ b/components/camel-rabbitmq/pom.xml @@ -0,0 +1,122 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.camel</groupId> + <artifactId>components</artifactId> + <version>2.12-SNAPSHOT</version> + </parent> + + <groupId>org.apache.camel</groupId> + <artifactId>camel-rabbitmq</artifactId> + <packaging>bundle</packaging> + <name>Camel :: RabbitMQ</name> + <description>Camel RabbitMQ Component</description> + + <properties> + <camel.osgi.export.pkg> + org.apache.camel.component.rabbitmq.* + </camel.osgi.export.pkg> + <camel.osgi.export.service>org.apache.camel.spi.ComponentResolver;component=rabbitmq</camel.osgi.export.service> + </properties> + + <dependencies> + <dependency> + <groupId>com.rabbitmq</groupId> + <artifactId>amqp-client</artifactId> + <version>3.1.3</version> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <version>1.7.5</version> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <version>1.7.5</version> + </dependency> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-core</artifactId> + </dependency> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-test</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.11</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <version>1.9.5</version> + <scope>test</scope> + </dependency> + </dependencies> + + + <build> + <plugins> + <plugin> + <artifactId>maven-compiler-plugin</artifactId> + <version>3.0</version> + <configuration> + <source>1.6</source> + <target>1.6</target> + </configuration> + </plugin> + <plugin> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <excludes> + <exclude>**/*IntTest*</exclude> + </excludes> + </configuration> + </plugin> + </plugins> + </build> + + <profiles> + <profile> + <id>itest</id> + <build> + <plugins> + <plugin> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <excludes> + <exclude>None</exclude> + </excludes> + <includes> + <include>**/*IntTest*</include> + </includes> + </configuration> + </plugin> + </plugins> + </build> + </profile> + </profiles> + +</project> http://git-wip-us.apache.org/repos/asf/camel/blob/0e2498b1/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java new file mode 100644 index 0000000..7055a6f --- /dev/null +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java @@ -0,0 +1,28 @@ +package org.apache.camel.component.rabbitmq; + +import org.apache.camel.CamelContext; +import org.apache.camel.impl.DefaultComponent; + +import java.util.Map; + +/** + * @author Stephen Samuel + */ +public class RabbitMQComponent extends DefaultComponent { + + public RabbitMQComponent() { + } + + public RabbitMQComponent(CamelContext context) { + super(context); + } + + @Override + protected RabbitMQEndpoint createEndpoint(String uri, + String remaining, + Map<String, Object> params) throws Exception { + RabbitMQEndpoint endpoint = new RabbitMQEndpoint(uri, remaining, this); + setProperties(endpoint, params); + return endpoint; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/0e2498b1/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConstants.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConstants.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConstants.java new file mode 100644 index 0000000..d163361 --- /dev/null +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConstants.java @@ -0,0 +1,23 @@ +package org.apache.camel.component.rabbitmq; + +/** + * @author Stephen Samuel + */ +public class RabbitMQConstants { + public static final String ROUTING_KEY = "rabbitmq.ROUTING_KEY"; + public static final String EXCHANGE_NAME = "rabbitmq.EXCHANGE_NAME"; + public static final String CONTENT_TYPE = "rabbitmq.CONTENT_TYPE"; + public static final String PRIORITY = "rabbitmq.PRIORITY"; + public static final String DELIVERY_TAG = "rabbitmq.DELIVERY_TAG"; + public static final String CORRELATIONID = "rabbitmq.CORRELATIONID"; + public static final String MESSAGE_ID = "rabbitmq.MESSAGE_ID"; + public static final String DELIVERY_MODE = "rabbitmq.DELIVERY_MODE"; + public static final String USERID = "rabbitmq.USERID"; + public static final String CLUSTERID = "rabbitmq.CLUSTERID"; + public static final String REPLY_TO = "rabbitmq.REPLY_TO"; + public static final String CONTENT_ENCODING = "rabbitmq.CONTENT_ENCODING"; + public static final String TYPE = "rabbitmq.TYPE"; + public static final String EXPIRATION = "rabbitmq.EXPIRATION"; + public static final String TIMESTAMP = "rabbitmq.TIMESTAMP"; + public static final String APP_ID = "rabbitmq.APP_ID"; +} http://git-wip-us.apache.org/repos/asf/camel/blob/0e2498b1/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java new file mode 100644 index 0000000..e088568 --- /dev/null +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java @@ -0,0 +1,110 @@ +package org.apache.camel.component.rabbitmq; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.Envelope; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.impl.DefaultConsumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.ExecutorService; + +/** + * @author Stephen Samuel + */ +public class RabbitMQConsumer extends DefaultConsumer { + + private static final Logger logger = LoggerFactory.getLogger(RabbitMQConsumer.class); + + private final RabbitMQEndpoint endpoint; + + ExecutorService executor; + Connection conn; + Channel channel; + + public RabbitMQConsumer(RabbitMQEndpoint endpoint, Processor processor) { + super(endpoint, processor); + this.endpoint = endpoint; + } + + @Override + protected void doStart() throws Exception { + super.doStart(); + log.info("Starting RabbitMQ consumer"); + + executor = endpoint.createExecutor(); + logger.debug("Using executor {}", executor); + + conn = endpoint.connect(executor); + logger.debug("Using conn {}", conn); + + channel = conn.createChannel(); + logger.debug("Using channel {}", channel); + + channel.exchangeDeclare(endpoint.getExchangeName(), "direct", true); + channel.queueDeclare(endpoint.getQueue(), true, false, false, null); + channel.queueBind(endpoint.getQueue(), endpoint.getExchangeName(), + endpoint.getRoutingKey() == null ? "" : endpoint.getRoutingKey()); + + channel.basicConsume(endpoint.getQueue(), endpoint.isAutoAck(), new RabbitConsumer(this, channel)); + } + + @Override + protected void doStop() throws Exception { + super.doStop(); + log.info("Stopping RabbitMQ consumer"); + if (conn != null) + try { + conn.close(); + } catch (Exception ignored) { } + + channel = null; + conn = null; + executor.shutdown(); + executor = null; + } + + class RabbitConsumer extends com.rabbitmq.client.DefaultConsumer { + + private final RabbitMQConsumer consumer; + private final Channel channel; + + /** + * Constructs a new instance and records its association to the passed-in channel. + * + * @param channel the channel to which this consumer is attached + */ + public RabbitConsumer(RabbitMQConsumer consumer, Channel channel) { + super(channel); + this.consumer = consumer; + this.channel = channel; + } + + @Override + public void handleDelivery(String consumerTag, + Envelope envelope, + AMQP.BasicProperties properties, + byte[] body) + throws IOException { + + Exchange exchange = consumer.endpoint.createRabbitExchange(envelope); + logger.trace("Created exchange [exchange={}]", new Object[]{exchange}); + + try { + consumer.getProcessor().process(exchange); + + long deliveryTag = envelope.getDeliveryTag(); + logger.trace("Acknowleding receipt [delivery_tag={}]", deliveryTag); + channel.basicAck(deliveryTag, false); + + } catch (Exception e) { + e.printStackTrace(); + } + } + } +} + http://git-wip-us.apache.org/repos/asf/camel/blob/0e2498b1/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java new file mode 100644 index 0000000..94ec68c --- /dev/null +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java @@ -0,0 +1,167 @@ +package org.apache.camel.component.rabbitmq; + +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.Envelope; +import org.apache.camel.Consumer; +import org.apache.camel.Exchange; +import org.apache.camel.Message; +import org.apache.camel.Processor; +import org.apache.camel.Producer; +import org.apache.camel.impl.DefaultEndpoint; +import org.apache.camel.impl.DefaultExchange; +import org.apache.camel.impl.DefaultMessage; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; + +/** + * @author Stephen Samuel + */ +public class RabbitMQEndpoint extends DefaultEndpoint { + + private String username; + private String password; + private String vhost; + private String hostname; + private int threadPoolSize = 10; + private int portNumber; + private boolean autoAck = true; + private String queue = String.valueOf(UUID.randomUUID().toString().hashCode()); + private String exchangeName; + private String routingKey; + + public String getExchangeName() { + return exchangeName; + } + + public void setQueue(String queue) { + this.queue = queue; + } + + public int getThreadPoolSize() { + return threadPoolSize; + } + + public void setThreadPoolSize(int threadPoolSize) { + this.threadPoolSize = threadPoolSize; + } + + public boolean isAutoAck() { + return autoAck; + } + + public void setAutoAck(boolean autoAck) { + this.autoAck = autoAck; + } + + public String getQueue() { + return queue; + } + + public String getRoutingKey() { + return routingKey; + } + + public void setRoutingKey(String routingKey) { + this.routingKey = routingKey; + } + + public RabbitMQEndpoint() { + } + + public RabbitMQEndpoint(String endpointUri, + String remaining, + RabbitMQComponent component) throws URISyntaxException { + super(endpointUri, component); + + URI uri = new URI("http://" + remaining); + hostname = uri.getHost(); + portNumber = uri.getPort(); + exchangeName = uri.getPath().substring(1); + } + + public Exchange createRabbitExchange(Envelope envelope) { + Exchange exchange = new DefaultExchange(getCamelContext(), getExchangePattern()); + + Message message = new DefaultMessage(); + exchange.setIn(message); + + message.setHeader(RabbitMQConstants.ROUTING_KEY, envelope.getRoutingKey()); + message.setHeader(RabbitMQConstants.EXCHANGE_NAME, envelope.getExchange()); + message.setHeader(RabbitMQConstants.DELIVERY_TAG, envelope.getDeliveryTag()); + + return exchange; + } + + @Override + public Consumer createConsumer(Processor processor) throws Exception { + RabbitMQConsumer consumer = new RabbitMQConsumer(this, processor); + configureConsumer(consumer); + return consumer; + } + + public Connection connect(ExecutorService executor) throws IOException { + ConnectionFactory factory = new ConnectionFactory(); + factory.setUsername(getUsername()); + factory.setPassword(getPassword()); + if (getVhost() == null) + factory.setVirtualHost("/"); + else + factory.setVirtualHost(getVhost()); + factory.setHost(getHostname()); + factory.setPort(getPortNumber()); + return factory.newConnection(executor); + } + + @Override + public Producer createProducer() throws Exception { + return new RabbitMQProducer(this); + } + + @Override + public boolean isSingleton() { + return true; + } + + public int getPortNumber() { + return portNumber; + } + + public String getHostname() { + return hostname; + } + + public String getVhost() { + return vhost; + } + + public String getPassword() { + return password; + } + + public String getUsername() { + return username; + } + + public void setUsername(String username) { + this.username = username; + } + + public void setPassword(String password) { + this.password = password; + } + + public void setVhost(String vhost) { + this.vhost = vhost; + } + + public ThreadPoolExecutor createExecutor() { + return (ThreadPoolExecutor) Executors.newFixedThreadPool(getThreadPoolSize()); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/0e2498b1/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java new file mode 100644 index 0000000..dcfdb2e --- /dev/null +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java @@ -0,0 +1,104 @@ +package org.apache.camel.component.rabbitmq; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import org.apache.camel.Exchange; +import org.apache.camel.impl.DefaultProducer; + +import java.io.IOException; +import java.util.Date; +import java.util.concurrent.Executors; + +/** + * @author Stephen Samuel + */ +public class RabbitMQProducer extends DefaultProducer { + + private final RabbitMQEndpoint endpoint; + private Connection conn; + private Channel channel; + + public RabbitMQProducer(RabbitMQEndpoint endpoint) throws IOException { + super(endpoint); + this.endpoint = endpoint; + this.conn = endpoint.connect(Executors.newSingleThreadExecutor()); + this.channel = conn.createChannel(); + } + + public void shutdown() throws IOException { + conn.close(); + } + + @Override + public void process(Exchange exchange) throws Exception { + + Object key = exchange.getIn().getHeader(RabbitMQConstants.ROUTING_KEY); + String exchangeName = exchange.getIn().getHeader(RabbitMQConstants.EXCHANGE_NAME).toString(); + byte[] messageBodyBytes = exchange.getIn().getBody(byte[].class); + AMQP.BasicProperties.Builder properties = buildProperties(exchange); + + channel.basicPublish(exchangeName, + key == null ? "" : key.toString(), + properties.build(), + messageBodyBytes); + } + + AMQP.BasicProperties.Builder buildProperties(Exchange exchange) { + AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder(); + + final Object contentType = exchange.getIn().getHeader(RabbitMQConstants.CONTENT_TYPE); + if (contentType != null) + properties.contentType(contentType.toString()); + + final Object priority = exchange.getIn().getHeader(RabbitMQConstants.PRIORITY); + if (priority != null) + properties.priority(Integer.parseInt(priority.toString())); + + final Object messageId = exchange.getIn().getHeader(RabbitMQConstants.MESSAGE_ID); + if (messageId != null) + properties.messageId(messageId.toString()); + + final Object clusterId = exchange.getIn().getHeader(RabbitMQConstants.CLUSTERID); + if (clusterId != null) + properties.clusterId(clusterId.toString()); + + final Object replyTo = exchange.getIn().getHeader(RabbitMQConstants.REPLY_TO); + if (replyTo != null) + properties.replyTo(replyTo.toString()); + + final Object correlationId = exchange.getIn().getHeader(RabbitMQConstants.CORRELATIONID); + if (correlationId != null) + properties.correlationId(correlationId.toString()); + + final Object deliveryMode = exchange.getIn().getHeader(RabbitMQConstants.DELIVERY_MODE); + if (deliveryMode != null) + properties.deliveryMode(Integer.parseInt(deliveryMode.toString())); + + final Object userId = exchange.getIn().getHeader(RabbitMQConstants.USERID); + if (userId != null) + properties.userId(userId.toString()); + + final Object type = exchange.getIn().getHeader(RabbitMQConstants.TYPE); + if (type != null) + properties.type(type.toString()); + + final Object contentEncoding = exchange.getIn().getHeader(RabbitMQConstants.CONTENT_ENCODING); + if (contentEncoding != null) + properties.contentEncoding(contentEncoding.toString()); + + final Object expiration = exchange.getIn().getHeader(RabbitMQConstants.EXPIRATION); + if (expiration != null) + properties.expiration(expiration.toString()); + + final Object appId = exchange.getIn().getHeader(RabbitMQConstants.APP_ID); + if (appId != null) + properties.appId(appId.toString()); + + final Object timestamp = exchange.getIn().getHeader(RabbitMQConstants.TIMESTAMP); + if (timestamp != null) + properties.timestamp(new Date(Long.parseLong(timestamp.toString()))); + + return properties; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/0e2498b1/components/camel-rabbitmq/src/main/resources/META-INF/services/org/apache/camel/component/rabbitmq ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/resources/META-INF/services/org/apache/camel/component/rabbitmq b/components/camel-rabbitmq/src/main/resources/META-INF/services/org/apache/camel/component/rabbitmq new file mode 100644 index 0000000..3211140 --- /dev/null +++ b/components/camel-rabbitmq/src/main/resources/META-INF/services/org/apache/camel/component/rabbitmq @@ -0,0 +1 @@ +class=org.apache.camel.component.rabbitmq.RabbitMQComponent \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/0e2498b1/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQComponentTest.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQComponentTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQComponentTest.java new file mode 100644 index 0000000..07b068f --- /dev/null +++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQComponentTest.java @@ -0,0 +1,44 @@ +package org.apache.camel.component.rabbitmq; + +import org.apache.camel.CamelContext; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +/** + * @author Stephen Samuel + */ +public class RabbitMQComponentTest { + + private CamelContext context = Mockito.mock(CamelContext.class); + + @Test + public void testPropertiesSet() throws Exception { + Map<String, Object> params = new HashMap<String, Object>(); + params.put("username", "coldplay"); + params.put("password", "chrism"); + params.put("autoAck", true); + params.put("vhost", "vman"); + params.put("threadPoolSize", 515); + params.put("portNumber", 14123); + params.put("hostname", "special.host"); + params.put("queue", "queuey"); + + String uri = "rabbitmq:special.host:14/queuey"; + String remaining = "special.host:14/queuey"; + + RabbitMQEndpoint endpoint = new RabbitMQComponent(context).createEndpoint(uri, remaining, params); + assertEquals("chrism", endpoint.getPassword()); + assertEquals("coldplay", endpoint.getUsername()); + assertEquals("queuey", endpoint.getQueue()); + assertEquals("vman", endpoint.getVhost()); + assertEquals("special.host", endpoint.getHostname()); + assertEquals(14, endpoint.getPortNumber()); + assertEquals(515, endpoint.getThreadPoolSize()); + assertEquals(true, endpoint.isAutoAck()); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/0e2498b1/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerIntTest.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerIntTest.java new file mode 100644 index 0000000..6f4dfa1 --- /dev/null +++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerIntTest.java @@ -0,0 +1,64 @@ +package org.apache.camel.component.rabbitmq; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import org.apache.camel.Endpoint; +import org.apache.camel.EndpointInject; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * @author Stephen Samuel + */ +public class RabbitMQConsumerIntTest extends CamelTestSupport { + + private static final Logger logger = LoggerFactory.getLogger(RabbitMQConsumerIntTest.class); + private static final String EXCHANGE = "ex1"; + + @EndpointInject(uri = "rabbitmq:localhost:5672/" + EXCHANGE + "?username=cameltest&password=cameltest") + private Endpoint from; + + @EndpointInject(uri = "mock:result") + private MockEndpoint to; + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + + @Override + public void configure() throws Exception { + from(from).to(to); + } + }; + } + + @Test + public void sentMessageIsReceived() throws InterruptedException, IOException { + + to.expectedMessageCount(1); + + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost("localhost"); + factory.setPort(5672); + factory.setUsername("cameltest"); + factory.setPassword("cameltest"); + factory.setVirtualHost("/"); + Connection conn = factory.newConnection(); + + AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder(); + + Channel channel = conn.createChannel(); + channel.basicPublish(EXCHANGE, "", properties.build(), "hello world".getBytes()); + + to.assertIsSatisfied(); + } +} + http://git-wip-us.apache.org/repos/asf/camel/blob/0e2498b1/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerTest.java new file mode 100644 index 0000000..26af07b --- /dev/null +++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerTest.java @@ -0,0 +1,56 @@ +package org.apache.camel.component.rabbitmq; + +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import org.apache.camel.Processor; +import org.junit.Test; +import org.mockito.Matchers; +import org.mockito.Mockito; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * @author Stephen Samuel + */ +public class RabbitMQConsumerTest { + + private RabbitMQEndpoint endpoint = Mockito.mock(RabbitMQEndpoint.class); + private Connection conn = Mockito.mock(Connection.class); + private Processor processor = Mockito.mock(Processor.class); + private Channel channel = Mockito.mock(Channel.class); + + @Test + public void testStoppingConsumerShutsdownExecutor() throws Exception { + RabbitMQConsumer consumer = new RabbitMQConsumer(endpoint, processor); + + ThreadPoolExecutor e = (ThreadPoolExecutor) Executors.newFixedThreadPool(3); + Mockito.when(endpoint.createExecutor()).thenReturn(e); + Mockito.when(endpoint.connect(Matchers.any(ExecutorService.class))).thenReturn(conn); + Mockito.when(conn.createChannel()).thenReturn(channel); + + consumer.doStart(); + assertFalse(e.isShutdown()); + + consumer.doStop(); + assertTrue(e.isShutdown()); + } + + @Test + public void testStoppingConsumerShutsdownConnection() throws Exception { + RabbitMQConsumer consumer = new RabbitMQConsumer(endpoint, processor); + + Mockito.when(endpoint.createExecutor()).thenReturn((ThreadPoolExecutor) Executors.newFixedThreadPool(3)); + Mockito.when(endpoint.connect(Matchers.any(ExecutorService.class))).thenReturn(conn); + Mockito.when(conn.createChannel()).thenReturn(channel); + + consumer.doStart(); + consumer.doStop(); + + Mockito.verify(conn).close(); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/0e2498b1/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java new file mode 100644 index 0000000..dbdb34b --- /dev/null +++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java @@ -0,0 +1,58 @@ +package org.apache.camel.component.rabbitmq; + +import com.rabbitmq.client.Envelope; +import org.apache.camel.Exchange; +import org.junit.Test; +import org.mockito.Mockito; + +import java.net.URISyntaxException; +import java.util.UUID; +import java.util.concurrent.ThreadPoolExecutor; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * @author Stephen Samuel + */ +public class RabbitMQEndpointTest { + + private Envelope envelope = Mockito.mock(Envelope.class); + + @Test + public void testCreatingRabbitExchangeSetsHeaders() throws URISyntaxException { + RabbitMQEndpoint endpoint = + new RabbitMQEndpoint("rabbitmq:localhost/exchange", "localhost/exchange", new RabbitMQComponent()); + + String routingKey = UUID.randomUUID().toString(); + String exchangeName = UUID.randomUUID().toString(); + long tag = UUID.randomUUID().toString().hashCode(); + + Mockito.when(envelope.getRoutingKey()).thenReturn(routingKey); + Mockito.when(envelope.getExchange()).thenReturn(exchangeName); + Mockito.when(envelope.getDeliveryTag()).thenReturn(tag); + + Exchange exchange = endpoint.createRabbitExchange(envelope); + assertEquals(exchangeName, exchange.getIn().getHeader(RabbitMQConstants.EXCHANGE_NAME)); + assertEquals(routingKey, exchange.getIn().getHeader(RabbitMQConstants.ROUTING_KEY)); + assertEquals(tag, exchange.getIn().getHeader(RabbitMQConstants.DELIVERY_TAG)); + } + + @Test + public void creatingExecutorUsesThreadPoolSettings() throws Exception { + + RabbitMQEndpoint endpoint = + new RabbitMQEndpoint("rabbitmq:localhost/exchange", "localhost/exchange", new RabbitMQComponent()); + endpoint.setThreadPoolSize(400); + ThreadPoolExecutor executor = endpoint.createExecutor(); + + assertEquals(400, executor.getCorePoolSize()); + } + + @Test + public void assertSingleton() throws URISyntaxException { + RabbitMQEndpoint endpoint = + new RabbitMQEndpoint("rabbitmq:localhost/exchange", "localhost/exchange", new RabbitMQComponent()); + assertTrue(endpoint.isSingleton()); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/0e2498b1/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerIntTest.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerIntTest.java new file mode 100644 index 0000000..b58d728 --- /dev/null +++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerIntTest.java @@ -0,0 +1,79 @@ +package org.apache.camel.component.rabbitmq; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.DefaultConsumer; +import com.rabbitmq.client.Envelope; +import org.apache.camel.Endpoint; +import org.apache.camel.EndpointInject; +import org.apache.camel.Produce; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * @author Stephen Samuel + */ +public class RabbitMQProducerIntTest extends CamelTestSupport { + + private static final Logger logger = LoggerFactory.getLogger(RabbitMQProducerIntTest.class); + private static final String EXCHANGE = "ex1"; + + @EndpointInject(uri = "rabbitmq:localhost:5672/" + EXCHANGE + "?username=cameltest&password=cameltest") + private Endpoint to; + + @Produce(uri = "direct:start") + protected ProducerTemplate template; + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + + @Override + public void configure() throws Exception { + from("direct:start").to(to); + } + }; + } + + @Test + public void producedMessageIsReceived() throws InterruptedException, IOException { + + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost("localhost"); + factory.setPort(5672); + factory.setUsername("cameltest"); + factory.setPassword("cameltest"); + factory.setVirtualHost("/"); + Connection conn = factory.newConnection(); + + final List received = new ArrayList(); + + Channel channel = conn.createChannel(); + channel.queueDeclare("sammyq", false, false, true, null); + channel.queueBind("sammyq", EXCHANGE, ""); + channel.basicConsume("sammyq", true, new DefaultConsumer(channel) { + @Override + public void handleDelivery(String consumerTag, + Envelope envelope, + AMQP.BasicProperties properties, + byte[] body) throws IOException { + received.add(envelope); + } + }); + + template.sendBodyAndHeader("new message", RabbitMQConstants.EXCHANGE_NAME, "ex1"); + Thread.sleep(500); + assertEquals(1, received.size()); + } +} + http://git-wip-us.apache.org/repos/asf/camel/blob/0e2498b1/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java new file mode 100644 index 0000000..26cff06 --- /dev/null +++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java @@ -0,0 +1,138 @@ +package org.apache.camel.component.rabbitmq; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Connection; +import org.apache.camel.Exchange; +import org.apache.camel.Message; +import org.apache.camel.impl.DefaultMessage; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Matchers; +import org.mockito.Mockito; + +import java.io.IOException; +import java.util.concurrent.ExecutorService; + +import static org.junit.Assert.assertEquals; + +/** + * @author Stephen Samuel + */ +public class RabbitMQProducerTest { + + private RabbitMQEndpoint endpoint = Mockito.mock(RabbitMQEndpoint.class); + private Exchange exchange = Mockito.mock(Exchange.class); + private Message message = new DefaultMessage(); + private Connection conn = Mockito.mock(Connection.class); + + @Before + public void before() throws IOException { + Mockito.when(exchange.getIn()).thenReturn(message); + Mockito.when(endpoint.connect(Matchers.any(ExecutorService.class))).thenReturn(conn); + Mockito.when(conn.createChannel()).thenReturn(null); + } + + @Test + public void testPropertiesUsesContentTypeHeader() throws IOException { + RabbitMQProducer producer = new RabbitMQProducer(endpoint); + message.setHeader(RabbitMQConstants.CONTENT_TYPE, "application/json"); + AMQP.BasicProperties props = producer.buildProperties(exchange).build(); + assertEquals("application/json", props.getContentType()); + } + + @Test + public void testPropertiesUsesCorrelationHeader() throws IOException { + RabbitMQProducer producer = new RabbitMQProducer(endpoint); + message.setHeader(RabbitMQConstants.CORRELATIONID, "124544"); + AMQP.BasicProperties props = producer.buildProperties(exchange).build(); + assertEquals("124544", props.getCorrelationId()); + } + + @Test + public void testPropertiesUsesUserIdHeader() throws IOException { + RabbitMQProducer producer = new RabbitMQProducer(endpoint); + message.setHeader(RabbitMQConstants.USERID, "abcd"); + AMQP.BasicProperties props = producer.buildProperties(exchange).build(); + assertEquals("abcd", props.getUserId()); + } + + @Test + public void testPropertiesUsesMessageIdHeader() throws IOException { + RabbitMQProducer producer = new RabbitMQProducer(endpoint); + message.setHeader(RabbitMQConstants.MESSAGE_ID, "abvasweaqQQ"); + AMQP.BasicProperties props = producer.buildProperties(exchange).build(); + assertEquals("abvasweaqQQ", props.getMessageId()); + } + + @Test + public void testPropertiesUsesDeliveryModeHeader() throws IOException { + RabbitMQProducer producer = new RabbitMQProducer(endpoint); + message.setHeader(RabbitMQConstants.DELIVERY_MODE, "444"); + AMQP.BasicProperties props = producer.buildProperties(exchange).build(); + assertEquals(444, props.getDeliveryMode().intValue()); + } + + @Test + public void testPropertiesUsesClusterIdHeader() throws IOException { + RabbitMQProducer producer = new RabbitMQProducer(endpoint); + message.setHeader(RabbitMQConstants.CLUSTERID, "abtasg5r"); + AMQP.BasicProperties props = producer.buildProperties(exchange).build(); + assertEquals("abtasg5r", props.getClusterId()); + } + + @Test + public void testPropertiesUsesReplyToHeader() throws IOException { + RabbitMQProducer producer = new RabbitMQProducer(endpoint); + message.setHeader(RabbitMQConstants.REPLY_TO, "bbbbdfgdfg"); + AMQP.BasicProperties props = producer.buildProperties(exchange).build(); + assertEquals("bbbbdfgdfg", props.getReplyTo()); + } + + @Test + public void testPropertiesUsesPriorityHeader() throws IOException { + RabbitMQProducer producer = new RabbitMQProducer(endpoint); + message.setHeader(RabbitMQConstants.PRIORITY, "15"); + AMQP.BasicProperties props = producer.buildProperties(exchange).build(); + assertEquals(15, props.getPriority().intValue()); + } + + @Test + public void testPropertiesUsesExpirationHeader() throws IOException { + RabbitMQProducer producer = new RabbitMQProducer(endpoint); + message.setHeader(RabbitMQConstants.EXPIRATION, "thursday"); + AMQP.BasicProperties props = producer.buildProperties(exchange).build(); + assertEquals("thursday", props.getExpiration()); + } + + @Test + public void testPropertiesUsesTypeHeader() throws IOException { + RabbitMQProducer producer = new RabbitMQProducer(endpoint); + message.setHeader(RabbitMQConstants.TYPE, "sometype"); + AMQP.BasicProperties props = producer.buildProperties(exchange).build(); + assertEquals("sometype", props.getType()); + } + + @Test + public void testPropertiesUsesContentEncodingHeader() throws IOException { + RabbitMQProducer producer = new RabbitMQProducer(endpoint); + message.setHeader(RabbitMQConstants.CONTENT_ENCODING, "qwergghdfdfgdfgg"); + AMQP.BasicProperties props = producer.buildProperties(exchange).build(); + assertEquals("qwergghdfdfgdfgg", props.getContentEncoding()); + } + + @Test + public void testPropertiesAppIdHeader() throws IOException { + RabbitMQProducer producer = new RabbitMQProducer(endpoint); + message.setHeader(RabbitMQConstants.APP_ID, "qweeqwe"); + AMQP.BasicProperties props = producer.buildProperties(exchange).build(); + assertEquals("qweeqwe", props.getAppId()); + } + + @Test + public void testPropertiesUsesTimestampHeader() throws IOException { + RabbitMQProducer producer = new RabbitMQProducer(endpoint); + message.setHeader(RabbitMQConstants.TIMESTAMP, "12345123"); + AMQP.BasicProperties props = producer.buildProperties(exchange).build(); + assertEquals(12345123, props.getTimestamp().getTime()); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/0e2498b1/components/camel-rabbitmq/src/test/resources/log4j.xml ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/test/resources/log4j.xml b/components/camel-rabbitmq/src/test/resources/log4j.xml new file mode 100644 index 0000000..7bdcb8a --- /dev/null +++ b/components/camel-rabbitmq/src/test/resources/log4j.xml @@ -0,0 +1,20 @@ +<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd"> +<log4j:configuration threshold="all" debug="true" + xmlns:log4j="http://jakarta.apache.org/log4j/"> + + <appender name="console" class="org.apache.log4j.ConsoleAppender"> + <param name="Target" value="System.out"/> + <layout class="org.apache.log4j.PatternLayout"> + <param name="ConversionPattern" value="[%t] %-5p %c{1}.%M - %m%n"/> + </layout> + </appender> + + <logger name="org.apache.camel.component.rabbitmq"> + <level value="debug"/> + </logger> + <root> + <level value="warn"/> + <appender-ref ref="console"/> + </root> + +</log4j:configuration> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/0e2498b1/components/pom.xml ---------------------------------------------------------------------- diff --git a/components/pom.xml b/components/pom.xml index e6fd60d..3f5713d 100644 --- a/components/pom.xml +++ b/components/pom.xml @@ -137,6 +137,7 @@ <module>camel-protobuf</module> <module>camel-quartz</module> <module>camel-quickfix</module> + <module>camel-rabbitmq</module> <module>camel-restlet</module> <module>camel-rmi</module> <module>camel-routebox</module>