Updated Branches:
  refs/heads/master 4eed66cdf -> bf1f5f0cc

CAMEL-6534 Added camel-rabbitmq component with thanks to stephen


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/0e2498b1
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/0e2498b1
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/0e2498b1

Branch: refs/heads/master
Commit: 0e2498b12ceb74916d60bbeebbb86a5912315a10
Parents: 4eed66c
Author: Willem Jiang <ningji...@apache.org>
Authored: Mon Jul 15 18:43:15 2013 +0800
Committer: Willem Jiang <ningji...@apache.org>
Committed: Mon Jul 15 18:59:58 2013 +0800

----------------------------------------------------------------------
 components/camel-rabbitmq/pom.xml               | 122 ++++++++++++++
 .../component/rabbitmq/RabbitMQComponent.java   |  28 ++++
 .../component/rabbitmq/RabbitMQConstants.java   |  23 +++
 .../component/rabbitmq/RabbitMQConsumer.java    | 110 ++++++++++++
 .../component/rabbitmq/RabbitMQEndpoint.java    | 167 +++++++++++++++++++
 .../component/rabbitmq/RabbitMQProducer.java    | 104 ++++++++++++
 .../org/apache/camel/component/rabbitmq         |   1 +
 .../rabbitmq/RabbitMQComponentTest.java         |  44 +++++
 .../rabbitmq/RabbitMQConsumerIntTest.java       |  64 +++++++
 .../rabbitmq/RabbitMQConsumerTest.java          |  56 +++++++
 .../rabbitmq/RabbitMQEndpointTest.java          |  58 +++++++
 .../rabbitmq/RabbitMQProducerIntTest.java       |  79 +++++++++
 .../rabbitmq/RabbitMQProducerTest.java          | 138 +++++++++++++++
 .../camel-rabbitmq/src/test/resources/log4j.xml |  20 +++
 components/pom.xml                              |   1 +
 15 files changed, 1015 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/0e2498b1/components/camel-rabbitmq/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/pom.xml 
b/components/camel-rabbitmq/pom.xml
new file mode 100644
index 0000000..338df36
--- /dev/null
+++ b/components/camel-rabbitmq/pom.xml
@@ -0,0 +1,122 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+    
+    http://www.apache.org/licenses/LICENSE-2.0
+    
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.camel</groupId>
+    <artifactId>components</artifactId>
+    <version>2.12-SNAPSHOT</version>
+  </parent>
+
+  <groupId>org.apache.camel</groupId>
+  <artifactId>camel-rabbitmq</artifactId>
+  <packaging>bundle</packaging>
+  <name>Camel :: RabbitMQ</name>
+  <description>Camel RabbitMQ Component</description>
+  
+  <properties>
+        <camel.osgi.export.pkg>
+            org.apache.camel.component.rabbitmq.*
+        </camel.osgi.export.pkg>
+        
<camel.osgi.export.service>org.apache.camel.spi.ComponentResolver;component=rabbitmq</camel.osgi.export.service>
+  </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>com.rabbitmq</groupId>
+            <artifactId>amqp-client</artifactId>
+            <version>3.1.3</version>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <version>1.7.5</version>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+            <version>1.7.5</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-test</artifactId>
+           <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.11</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <version>1.9.5</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+
+    <build>
+        <plugins>
+            <plugin>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.0</version>
+                <configuration>
+                    <source>1.6</source>
+                    <target>1.6</target>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <configuration>
+                    <excludes>
+                        <exclude>**/*IntTest*</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
+    <profiles>
+        <profile>
+            <id>itest</id>
+            <build>
+                <plugins>
+                    <plugin>
+                        <artifactId>maven-surefire-plugin</artifactId>
+                        <configuration>
+                            <excludes>
+                                <exclude>None</exclude>
+                            </excludes>
+                            <includes>
+                                <include>**/*IntTest*</include>
+                            </includes>
+                        </configuration>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
+    </profiles>
+
+</project>

http://git-wip-us.apache.org/repos/asf/camel/blob/0e2498b1/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java
----------------------------------------------------------------------
diff --git 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java
 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java
new file mode 100644
index 0000000..7055a6f
--- /dev/null
+++ 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java
@@ -0,0 +1,28 @@
+package org.apache.camel.component.rabbitmq;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.impl.DefaultComponent;
+
+import java.util.Map;
+
+/**
+ * @author Stephen Samuel
+ */
+public class RabbitMQComponent extends DefaultComponent {
+
+    public RabbitMQComponent() {
+    }
+
+    public RabbitMQComponent(CamelContext context) {
+        super(context);
+    }
+
+    @Override
+    protected RabbitMQEndpoint createEndpoint(String uri,
+                                              String remaining,
+                                              Map<String, Object> params) 
throws Exception {
+        RabbitMQEndpoint endpoint = new RabbitMQEndpoint(uri, remaining, this);
+        setProperties(endpoint, params);
+        return endpoint;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/0e2498b1/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConstants.java
----------------------------------------------------------------------
diff --git 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConstants.java
 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConstants.java
new file mode 100644
index 0000000..d163361
--- /dev/null
+++ 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConstants.java
@@ -0,0 +1,23 @@
+package org.apache.camel.component.rabbitmq;
+
+/**
+ * @author Stephen Samuel
+ */
+public class RabbitMQConstants {
+    public static final String ROUTING_KEY = "rabbitmq.ROUTING_KEY";
+    public static final String EXCHANGE_NAME = "rabbitmq.EXCHANGE_NAME";
+    public static final String CONTENT_TYPE = "rabbitmq.CONTENT_TYPE";
+    public static final String PRIORITY = "rabbitmq.PRIORITY";
+    public static final String DELIVERY_TAG = "rabbitmq.DELIVERY_TAG";
+    public static final String CORRELATIONID = "rabbitmq.CORRELATIONID";
+    public static final String MESSAGE_ID = "rabbitmq.MESSAGE_ID";
+    public static final String DELIVERY_MODE = "rabbitmq.DELIVERY_MODE";
+    public static final String USERID = "rabbitmq.USERID";
+    public static final String CLUSTERID = "rabbitmq.CLUSTERID";
+    public static final String REPLY_TO = "rabbitmq.REPLY_TO";
+    public static final String CONTENT_ENCODING = "rabbitmq.CONTENT_ENCODING";
+    public static final String TYPE = "rabbitmq.TYPE";
+    public static final String EXPIRATION = "rabbitmq.EXPIRATION";
+    public static final String TIMESTAMP = "rabbitmq.TIMESTAMP";
+    public static final String APP_ID = "rabbitmq.APP_ID";
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/0e2498b1/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
new file mode 100644
index 0000000..e088568
--- /dev/null
+++ 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
@@ -0,0 +1,110 @@
+package org.apache.camel.component.rabbitmq;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.Envelope;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.impl.DefaultConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * @author Stephen Samuel
+ */
+public class RabbitMQConsumer extends DefaultConsumer {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(RabbitMQConsumer.class);
+
+    private final RabbitMQEndpoint endpoint;
+
+    ExecutorService executor;
+    Connection conn;
+    Channel channel;
+
+    public RabbitMQConsumer(RabbitMQEndpoint endpoint, Processor processor) {
+        super(endpoint, processor);
+        this.endpoint = endpoint;
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+        log.info("Starting RabbitMQ consumer");
+
+        executor = endpoint.createExecutor();
+        logger.debug("Using executor {}", executor);
+
+        conn = endpoint.connect(executor);
+        logger.debug("Using conn {}", conn);
+
+        channel = conn.createChannel();
+        logger.debug("Using channel {}", channel);
+
+        channel.exchangeDeclare(endpoint.getExchangeName(), "direct", true);
+        channel.queueDeclare(endpoint.getQueue(), true, false, false, null);
+        channel.queueBind(endpoint.getQueue(), endpoint.getExchangeName(),
+                endpoint.getRoutingKey() == null ? "" : 
endpoint.getRoutingKey());
+
+        channel.basicConsume(endpoint.getQueue(), endpoint.isAutoAck(), new 
RabbitConsumer(this, channel));
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        super.doStop();
+        log.info("Stopping RabbitMQ consumer");
+        if (conn != null)
+            try {
+                conn.close();
+            } catch (Exception ignored) { }
+
+        channel = null;
+        conn = null;
+        executor.shutdown();
+        executor = null;
+    }
+
+    class RabbitConsumer extends com.rabbitmq.client.DefaultConsumer {
+
+        private final RabbitMQConsumer consumer;
+        private final Channel channel;
+
+        /**
+         * Constructs a new instance and records its association to the 
passed-in channel.
+         *
+         * @param channel the channel to which this consumer is attached
+         */
+        public RabbitConsumer(RabbitMQConsumer consumer, Channel channel) {
+            super(channel);
+            this.consumer = consumer;
+            this.channel = channel;
+        }
+
+        @Override
+        public void handleDelivery(String consumerTag,
+                                   Envelope envelope,
+                                   AMQP.BasicProperties properties,
+                                   byte[] body)
+                throws IOException {
+
+            Exchange exchange = 
consumer.endpoint.createRabbitExchange(envelope);
+            logger.trace("Created exchange [exchange={}]", new 
Object[]{exchange});
+
+            try {
+                consumer.getProcessor().process(exchange);
+
+                long deliveryTag = envelope.getDeliveryTag();
+                logger.trace("Acknowleding receipt [delivery_tag={}]", 
deliveryTag);
+                channel.basicAck(deliveryTag, false);
+
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/camel/blob/0e2498b1/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
new file mode 100644
index 0000000..94ec68c
--- /dev/null
+++ 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
@@ -0,0 +1,167 @@
+package org.apache.camel.component.rabbitmq;
+
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.Envelope;
+import org.apache.camel.Consumer;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.impl.DefaultMessage;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
+
+/**
+ * @author Stephen Samuel
+ */
+public class RabbitMQEndpoint extends DefaultEndpoint {
+
+    private String username;
+    private String password;
+    private String vhost;
+    private String hostname;
+    private int threadPoolSize = 10;
+    private int portNumber;
+    private boolean autoAck = true;
+    private String queue = 
String.valueOf(UUID.randomUUID().toString().hashCode());
+    private String exchangeName;
+    private String routingKey;
+
+    public String getExchangeName() {
+        return exchangeName;
+    }
+
+    public void setQueue(String queue) {
+        this.queue = queue;
+    }
+
+    public int getThreadPoolSize() {
+        return threadPoolSize;
+    }
+
+    public void setThreadPoolSize(int threadPoolSize) {
+        this.threadPoolSize = threadPoolSize;
+    }
+
+    public boolean isAutoAck() {
+        return autoAck;
+    }
+
+    public void setAutoAck(boolean autoAck) {
+        this.autoAck = autoAck;
+    }
+
+    public String getQueue() {
+        return queue;
+    }
+
+    public String getRoutingKey() {
+        return routingKey;
+    }
+
+    public void setRoutingKey(String routingKey) {
+        this.routingKey = routingKey;
+    }
+
+    public RabbitMQEndpoint() {
+    }
+
+    public RabbitMQEndpoint(String endpointUri,
+                            String remaining,
+                            RabbitMQComponent component) throws 
URISyntaxException {
+        super(endpointUri, component);
+
+        URI uri = new URI("http://"; + remaining);
+        hostname = uri.getHost();
+        portNumber = uri.getPort();
+        exchangeName = uri.getPath().substring(1);
+    }
+
+    public Exchange createRabbitExchange(Envelope envelope) {
+        Exchange exchange = new DefaultExchange(getCamelContext(), 
getExchangePattern());
+
+        Message message = new DefaultMessage();
+        exchange.setIn(message);
+
+        message.setHeader(RabbitMQConstants.ROUTING_KEY, 
envelope.getRoutingKey());
+        message.setHeader(RabbitMQConstants.EXCHANGE_NAME, 
envelope.getExchange());
+        message.setHeader(RabbitMQConstants.DELIVERY_TAG, 
envelope.getDeliveryTag());
+
+        return exchange;
+    }
+
+    @Override
+    public Consumer createConsumer(Processor processor) throws Exception {
+        RabbitMQConsumer consumer = new RabbitMQConsumer(this, processor);
+        configureConsumer(consumer);
+        return consumer;
+    }
+
+    public Connection connect(ExecutorService executor) throws IOException {
+        ConnectionFactory factory = new ConnectionFactory();
+        factory.setUsername(getUsername());
+        factory.setPassword(getPassword());
+        if (getVhost() == null)
+            factory.setVirtualHost("/");
+        else
+            factory.setVirtualHost(getVhost());
+        factory.setHost(getHostname());
+        factory.setPort(getPortNumber());
+        return factory.newConnection(executor);
+    }
+
+    @Override
+    public Producer createProducer() throws Exception {
+        return new RabbitMQProducer(this);
+    }
+
+    @Override
+    public boolean isSingleton() {
+        return true;
+    }
+
+    public int getPortNumber() {
+        return portNumber;
+    }
+
+    public String getHostname() {
+        return hostname;
+    }
+
+    public String getVhost() {
+        return vhost;
+    }
+
+    public String getPassword() {
+        return password;
+    }
+
+    public String getUsername() {
+        return username;
+    }
+
+    public void setUsername(String username) {
+        this.username = username;
+    }
+
+    public void setPassword(String password) {
+        this.password = password;
+    }
+
+    public void setVhost(String vhost) {
+        this.vhost = vhost;
+    }
+
+    public ThreadPoolExecutor createExecutor() {
+        return (ThreadPoolExecutor) 
Executors.newFixedThreadPool(getThreadPoolSize());
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/0e2498b1/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
new file mode 100644
index 0000000..dcfdb2e
--- /dev/null
+++ 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
@@ -0,0 +1,104 @@
+package org.apache.camel.component.rabbitmq;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultProducer;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.concurrent.Executors;
+
+/**
+ * @author Stephen Samuel
+ */
+public class RabbitMQProducer extends DefaultProducer {
+
+    private final RabbitMQEndpoint endpoint;
+    private Connection conn;
+    private Channel channel;
+
+    public RabbitMQProducer(RabbitMQEndpoint endpoint) throws IOException {
+        super(endpoint);
+        this.endpoint = endpoint;
+        this.conn = endpoint.connect(Executors.newSingleThreadExecutor());
+        this.channel = conn.createChannel();
+    }
+
+    public void shutdown() throws IOException {
+        conn.close();
+    }
+
+    @Override
+    public void process(Exchange exchange) throws Exception {
+
+        Object key = exchange.getIn().getHeader(RabbitMQConstants.ROUTING_KEY);
+        String exchangeName = 
exchange.getIn().getHeader(RabbitMQConstants.EXCHANGE_NAME).toString();
+        byte[] messageBodyBytes = exchange.getIn().getBody(byte[].class);
+        AMQP.BasicProperties.Builder properties = buildProperties(exchange);
+
+        channel.basicPublish(exchangeName,
+                key == null ? "" : key.toString(),
+                properties.build(),
+                messageBodyBytes);
+    }
+
+    AMQP.BasicProperties.Builder buildProperties(Exchange exchange) {
+        AMQP.BasicProperties.Builder properties = new 
AMQP.BasicProperties.Builder();
+
+        final Object contentType = 
exchange.getIn().getHeader(RabbitMQConstants.CONTENT_TYPE);
+        if (contentType != null)
+            properties.contentType(contentType.toString());
+
+        final Object priority = 
exchange.getIn().getHeader(RabbitMQConstants.PRIORITY);
+        if (priority != null)
+            properties.priority(Integer.parseInt(priority.toString()));
+
+        final Object messageId = 
exchange.getIn().getHeader(RabbitMQConstants.MESSAGE_ID);
+        if (messageId != null)
+            properties.messageId(messageId.toString());
+
+        final Object clusterId = 
exchange.getIn().getHeader(RabbitMQConstants.CLUSTERID);
+        if (clusterId != null)
+            properties.clusterId(clusterId.toString());
+
+        final Object replyTo = 
exchange.getIn().getHeader(RabbitMQConstants.REPLY_TO);
+        if (replyTo != null)
+            properties.replyTo(replyTo.toString());
+
+        final Object correlationId = 
exchange.getIn().getHeader(RabbitMQConstants.CORRELATIONID);
+        if (correlationId != null)
+            properties.correlationId(correlationId.toString());
+
+        final Object deliveryMode = 
exchange.getIn().getHeader(RabbitMQConstants.DELIVERY_MODE);
+        if (deliveryMode != null)
+            properties.deliveryMode(Integer.parseInt(deliveryMode.toString()));
+
+        final Object userId = 
exchange.getIn().getHeader(RabbitMQConstants.USERID);
+        if (userId != null)
+            properties.userId(userId.toString());
+
+        final Object type = exchange.getIn().getHeader(RabbitMQConstants.TYPE);
+        if (type != null)
+            properties.type(type.toString());
+
+        final Object contentEncoding = 
exchange.getIn().getHeader(RabbitMQConstants.CONTENT_ENCODING);
+        if (contentEncoding != null)
+            properties.contentEncoding(contentEncoding.toString());
+
+        final Object expiration = 
exchange.getIn().getHeader(RabbitMQConstants.EXPIRATION);
+        if (expiration != null)
+            properties.expiration(expiration.toString());
+
+        final Object appId = 
exchange.getIn().getHeader(RabbitMQConstants.APP_ID);
+        if (appId != null)
+            properties.appId(appId.toString());
+
+        final Object timestamp = 
exchange.getIn().getHeader(RabbitMQConstants.TIMESTAMP);
+        if (timestamp != null)
+            properties.timestamp(new 
Date(Long.parseLong(timestamp.toString())));
+
+        return properties;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/0e2498b1/components/camel-rabbitmq/src/main/resources/META-INF/services/org/apache/camel/component/rabbitmq
----------------------------------------------------------------------
diff --git 
a/components/camel-rabbitmq/src/main/resources/META-INF/services/org/apache/camel/component/rabbitmq
 
b/components/camel-rabbitmq/src/main/resources/META-INF/services/org/apache/camel/component/rabbitmq
new file mode 100644
index 0000000..3211140
--- /dev/null
+++ 
b/components/camel-rabbitmq/src/main/resources/META-INF/services/org/apache/camel/component/rabbitmq
@@ -0,0 +1 @@
+class=org.apache.camel.component.rabbitmq.RabbitMQComponent
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/0e2498b1/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQComponentTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQComponentTest.java
 
b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQComponentTest.java
new file mode 100644
index 0000000..07b068f
--- /dev/null
+++ 
b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQComponentTest.java
@@ -0,0 +1,44 @@
+package org.apache.camel.component.rabbitmq;
+
+import org.apache.camel.CamelContext;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * @author Stephen Samuel
+ */
+public class RabbitMQComponentTest {
+
+    private CamelContext context = Mockito.mock(CamelContext.class);
+
+    @Test
+    public void testPropertiesSet() throws Exception {
+        Map<String, Object> params = new HashMap<String, Object>();
+        params.put("username", "coldplay");
+        params.put("password", "chrism");
+        params.put("autoAck", true);
+        params.put("vhost", "vman");
+        params.put("threadPoolSize", 515);
+        params.put("portNumber", 14123);
+        params.put("hostname", "special.host");
+        params.put("queue", "queuey");
+
+        String uri = "rabbitmq:special.host:14/queuey";
+        String remaining = "special.host:14/queuey";
+
+        RabbitMQEndpoint endpoint = new 
RabbitMQComponent(context).createEndpoint(uri, remaining, params);
+        assertEquals("chrism", endpoint.getPassword());
+        assertEquals("coldplay", endpoint.getUsername());
+        assertEquals("queuey", endpoint.getQueue());
+        assertEquals("vman", endpoint.getVhost());
+        assertEquals("special.host", endpoint.getHostname());
+        assertEquals(14, endpoint.getPortNumber());
+        assertEquals(515, endpoint.getThreadPoolSize());
+        assertEquals(true, endpoint.isAutoAck());
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/0e2498b1/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerIntTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerIntTest.java
 
b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerIntTest.java
new file mode 100644
index 0000000..6f4dfa1
--- /dev/null
+++ 
b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerIntTest.java
@@ -0,0 +1,64 @@
+package org.apache.camel.component.rabbitmq;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * @author Stephen Samuel
+ */
+public class RabbitMQConsumerIntTest extends CamelTestSupport {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(RabbitMQConsumerIntTest.class);
+    private static final String EXCHANGE = "ex1";
+
+    @EndpointInject(uri = "rabbitmq:localhost:5672/" + EXCHANGE + 
"?username=cameltest&password=cameltest")
+    private Endpoint from;
+
+    @EndpointInject(uri = "mock:result")
+    private MockEndpoint to;
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+
+            @Override
+            public void configure() throws Exception {
+                from(from).to(to);
+            }
+        };
+    }
+
+    @Test
+    public void sentMessageIsReceived() throws InterruptedException, 
IOException {
+
+        to.expectedMessageCount(1);
+
+        ConnectionFactory factory = new ConnectionFactory();
+        factory.setHost("localhost");
+        factory.setPort(5672);
+        factory.setUsername("cameltest");
+        factory.setPassword("cameltest");
+        factory.setVirtualHost("/");
+        Connection conn = factory.newConnection();
+
+        AMQP.BasicProperties.Builder properties = new 
AMQP.BasicProperties.Builder();
+
+        Channel channel = conn.createChannel();
+        channel.basicPublish(EXCHANGE, "", properties.build(), "hello 
world".getBytes());
+
+        to.assertIsSatisfied();
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/camel/blob/0e2498b1/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerTest.java
 
b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerTest.java
new file mode 100644
index 0000000..26af07b
--- /dev/null
+++ 
b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerTest.java
@@ -0,0 +1,56 @@
+package org.apache.camel.component.rabbitmq;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import org.apache.camel.Processor;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * @author Stephen Samuel
+ */
+public class RabbitMQConsumerTest {
+
+    private RabbitMQEndpoint endpoint = Mockito.mock(RabbitMQEndpoint.class);
+    private Connection conn = Mockito.mock(Connection.class);
+    private Processor processor = Mockito.mock(Processor.class);
+    private Channel channel = Mockito.mock(Channel.class);
+
+    @Test
+    public void testStoppingConsumerShutsdownExecutor() throws Exception {
+        RabbitMQConsumer consumer = new RabbitMQConsumer(endpoint, processor);
+
+        ThreadPoolExecutor e = (ThreadPoolExecutor) 
Executors.newFixedThreadPool(3);
+        Mockito.when(endpoint.createExecutor()).thenReturn(e);
+        
Mockito.when(endpoint.connect(Matchers.any(ExecutorService.class))).thenReturn(conn);
+        Mockito.when(conn.createChannel()).thenReturn(channel);
+
+        consumer.doStart();
+        assertFalse(e.isShutdown());
+
+        consumer.doStop();
+        assertTrue(e.isShutdown());
+    }
+
+    @Test
+    public void testStoppingConsumerShutsdownConnection() throws Exception {
+        RabbitMQConsumer consumer = new RabbitMQConsumer(endpoint, processor);
+
+        
Mockito.when(endpoint.createExecutor()).thenReturn((ThreadPoolExecutor) 
Executors.newFixedThreadPool(3));
+        
Mockito.when(endpoint.connect(Matchers.any(ExecutorService.class))).thenReturn(conn);
+        Mockito.when(conn.createChannel()).thenReturn(channel);
+
+        consumer.doStart();
+        consumer.doStop();
+
+        Mockito.verify(conn).close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/0e2498b1/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
 
b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
new file mode 100644
index 0000000..dbdb34b
--- /dev/null
+++ 
b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
@@ -0,0 +1,58 @@
+package org.apache.camel.component.rabbitmq;
+
+import com.rabbitmq.client.Envelope;
+import org.apache.camel.Exchange;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.net.URISyntaxException;
+import java.util.UUID;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * @author Stephen Samuel
+ */
+public class RabbitMQEndpointTest {
+
+    private Envelope envelope = Mockito.mock(Envelope.class);
+
+    @Test
+    public void testCreatingRabbitExchangeSetsHeaders() throws 
URISyntaxException {
+        RabbitMQEndpoint endpoint =
+                new RabbitMQEndpoint("rabbitmq:localhost/exchange", 
"localhost/exchange", new RabbitMQComponent());
+
+        String routingKey = UUID.randomUUID().toString();
+        String exchangeName = UUID.randomUUID().toString();
+        long tag = UUID.randomUUID().toString().hashCode();
+
+        Mockito.when(envelope.getRoutingKey()).thenReturn(routingKey);
+        Mockito.when(envelope.getExchange()).thenReturn(exchangeName);
+        Mockito.when(envelope.getDeliveryTag()).thenReturn(tag);
+
+        Exchange exchange = endpoint.createRabbitExchange(envelope);
+        assertEquals(exchangeName, 
exchange.getIn().getHeader(RabbitMQConstants.EXCHANGE_NAME));
+        assertEquals(routingKey, 
exchange.getIn().getHeader(RabbitMQConstants.ROUTING_KEY));
+        assertEquals(tag, 
exchange.getIn().getHeader(RabbitMQConstants.DELIVERY_TAG));
+    }
+
+    @Test
+    public void creatingExecutorUsesThreadPoolSettings() throws Exception {
+
+        RabbitMQEndpoint endpoint =
+                new RabbitMQEndpoint("rabbitmq:localhost/exchange", 
"localhost/exchange", new RabbitMQComponent());
+        endpoint.setThreadPoolSize(400);
+        ThreadPoolExecutor executor = endpoint.createExecutor();
+
+        assertEquals(400, executor.getCorePoolSize());
+    }
+
+    @Test
+    public void assertSingleton() throws URISyntaxException {
+        RabbitMQEndpoint endpoint =
+                new RabbitMQEndpoint("rabbitmq:localhost/exchange", 
"localhost/exchange", new RabbitMQComponent());
+        assertTrue(endpoint.isSingleton());
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/0e2498b1/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerIntTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerIntTest.java
 
b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerIntTest.java
new file mode 100644
index 0000000..b58d728
--- /dev/null
+++ 
b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerIntTest.java
@@ -0,0 +1,79 @@
+package org.apache.camel.component.rabbitmq;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.DefaultConsumer;
+import com.rabbitmq.client.Envelope;
+import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @author Stephen Samuel
+ */
+public class RabbitMQProducerIntTest extends CamelTestSupport {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(RabbitMQProducerIntTest.class);
+    private static final String EXCHANGE = "ex1";
+
+    @EndpointInject(uri = "rabbitmq:localhost:5672/" + EXCHANGE + 
"?username=cameltest&password=cameltest")
+    private Endpoint to;
+
+    @Produce(uri = "direct:start")
+    protected ProducerTemplate template;
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+
+            @Override
+            public void configure() throws Exception {
+                from("direct:start").to(to);
+            }
+        };
+    }
+
+    @Test
+    public void producedMessageIsReceived() throws InterruptedException, 
IOException {
+
+        ConnectionFactory factory = new ConnectionFactory();
+        factory.setHost("localhost");
+        factory.setPort(5672);
+        factory.setUsername("cameltest");
+        factory.setPassword("cameltest");
+        factory.setVirtualHost("/");
+        Connection conn = factory.newConnection();
+
+        final List received = new ArrayList();
+
+        Channel channel = conn.createChannel();
+        channel.queueDeclare("sammyq", false, false, true, null);
+        channel.queueBind("sammyq", EXCHANGE, "");
+        channel.basicConsume("sammyq", true, new DefaultConsumer(channel) {
+            @Override
+            public void handleDelivery(String consumerTag,
+                                       Envelope envelope,
+                                       AMQP.BasicProperties properties,
+                                       byte[] body) throws IOException {
+                received.add(envelope);
+            }
+        });
+
+        template.sendBodyAndHeader("new message", 
RabbitMQConstants.EXCHANGE_NAME, "ex1");
+        Thread.sleep(500);
+        assertEquals(1, received.size());
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/camel/blob/0e2498b1/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java
 
b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java
new file mode 100644
index 0000000..26cff06
--- /dev/null
+++ 
b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java
@@ -0,0 +1,138 @@
+package org.apache.camel.component.rabbitmq;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Connection;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.impl.DefaultMessage;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * @author Stephen Samuel
+ */
+public class RabbitMQProducerTest {
+
+    private RabbitMQEndpoint endpoint = Mockito.mock(RabbitMQEndpoint.class);
+    private Exchange exchange = Mockito.mock(Exchange.class);
+    private Message message = new DefaultMessage();
+    private Connection conn = Mockito.mock(Connection.class);
+
+    @Before
+    public void before() throws IOException {
+        Mockito.when(exchange.getIn()).thenReturn(message);
+        
Mockito.when(endpoint.connect(Matchers.any(ExecutorService.class))).thenReturn(conn);
+        Mockito.when(conn.createChannel()).thenReturn(null);
+    }
+
+    @Test
+    public void testPropertiesUsesContentTypeHeader() throws IOException {
+        RabbitMQProducer producer = new RabbitMQProducer(endpoint);
+        message.setHeader(RabbitMQConstants.CONTENT_TYPE, "application/json");
+        AMQP.BasicProperties props = 
producer.buildProperties(exchange).build();
+        assertEquals("application/json", props.getContentType());
+    }
+
+    @Test
+    public void testPropertiesUsesCorrelationHeader() throws IOException {
+        RabbitMQProducer producer = new RabbitMQProducer(endpoint);
+        message.setHeader(RabbitMQConstants.CORRELATIONID, "124544");
+        AMQP.BasicProperties props = 
producer.buildProperties(exchange).build();
+        assertEquals("124544", props.getCorrelationId());
+    }
+
+    @Test
+    public void testPropertiesUsesUserIdHeader() throws IOException {
+        RabbitMQProducer producer = new RabbitMQProducer(endpoint);
+        message.setHeader(RabbitMQConstants.USERID, "abcd");
+        AMQP.BasicProperties props = 
producer.buildProperties(exchange).build();
+        assertEquals("abcd", props.getUserId());
+    }
+
+    @Test
+    public void testPropertiesUsesMessageIdHeader() throws IOException {
+        RabbitMQProducer producer = new RabbitMQProducer(endpoint);
+        message.setHeader(RabbitMQConstants.MESSAGE_ID, "abvasweaqQQ");
+        AMQP.BasicProperties props = 
producer.buildProperties(exchange).build();
+        assertEquals("abvasweaqQQ", props.getMessageId());
+    }
+
+    @Test
+    public void testPropertiesUsesDeliveryModeHeader() throws IOException {
+        RabbitMQProducer producer = new RabbitMQProducer(endpoint);
+        message.setHeader(RabbitMQConstants.DELIVERY_MODE, "444");
+        AMQP.BasicProperties props = 
producer.buildProperties(exchange).build();
+        assertEquals(444, props.getDeliveryMode().intValue());
+    }
+
+    @Test
+    public void testPropertiesUsesClusterIdHeader() throws IOException {
+        RabbitMQProducer producer = new RabbitMQProducer(endpoint);
+        message.setHeader(RabbitMQConstants.CLUSTERID, "abtasg5r");
+        AMQP.BasicProperties props = 
producer.buildProperties(exchange).build();
+        assertEquals("abtasg5r", props.getClusterId());
+    }
+
+    @Test
+    public void testPropertiesUsesReplyToHeader() throws IOException {
+        RabbitMQProducer producer = new RabbitMQProducer(endpoint);
+        message.setHeader(RabbitMQConstants.REPLY_TO, "bbbbdfgdfg");
+        AMQP.BasicProperties props = 
producer.buildProperties(exchange).build();
+        assertEquals("bbbbdfgdfg", props.getReplyTo());
+    }
+
+    @Test
+    public void testPropertiesUsesPriorityHeader() throws IOException {
+        RabbitMQProducer producer = new RabbitMQProducer(endpoint);
+        message.setHeader(RabbitMQConstants.PRIORITY, "15");
+        AMQP.BasicProperties props = 
producer.buildProperties(exchange).build();
+        assertEquals(15, props.getPriority().intValue());
+    }
+
+    @Test
+    public void testPropertiesUsesExpirationHeader() throws IOException {
+        RabbitMQProducer producer = new RabbitMQProducer(endpoint);
+        message.setHeader(RabbitMQConstants.EXPIRATION, "thursday");
+        AMQP.BasicProperties props = 
producer.buildProperties(exchange).build();
+        assertEquals("thursday", props.getExpiration());
+    }
+
+    @Test
+    public void testPropertiesUsesTypeHeader() throws IOException {
+        RabbitMQProducer producer = new RabbitMQProducer(endpoint);
+        message.setHeader(RabbitMQConstants.TYPE, "sometype");
+        AMQP.BasicProperties props = 
producer.buildProperties(exchange).build();
+        assertEquals("sometype", props.getType());
+    }
+
+    @Test
+    public void testPropertiesUsesContentEncodingHeader() throws IOException {
+        RabbitMQProducer producer = new RabbitMQProducer(endpoint);
+        message.setHeader(RabbitMQConstants.CONTENT_ENCODING, 
"qwergghdfdfgdfgg");
+        AMQP.BasicProperties props = 
producer.buildProperties(exchange).build();
+        assertEquals("qwergghdfdfgdfgg", props.getContentEncoding());
+    }
+
+    @Test
+    public void testPropertiesAppIdHeader() throws IOException {
+        RabbitMQProducer producer = new RabbitMQProducer(endpoint);
+        message.setHeader(RabbitMQConstants.APP_ID, "qweeqwe");
+        AMQP.BasicProperties props = 
producer.buildProperties(exchange).build();
+        assertEquals("qweeqwe", props.getAppId());
+    }
+
+    @Test
+    public void testPropertiesUsesTimestampHeader() throws IOException {
+        RabbitMQProducer producer = new RabbitMQProducer(endpoint);
+        message.setHeader(RabbitMQConstants.TIMESTAMP, "12345123");
+        AMQP.BasicProperties props = 
producer.buildProperties(exchange).build();
+        assertEquals(12345123, props.getTimestamp().getTime());
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/0e2498b1/components/camel-rabbitmq/src/test/resources/log4j.xml
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/test/resources/log4j.xml 
b/components/camel-rabbitmq/src/test/resources/log4j.xml
new file mode 100644
index 0000000..7bdcb8a
--- /dev/null
+++ b/components/camel-rabbitmq/src/test/resources/log4j.xml
@@ -0,0 +1,20 @@
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+<log4j:configuration threshold="all" debug="true"
+                     xmlns:log4j="http://jakarta.apache.org/log4j/";>
+
+    <appender name="console" class="org.apache.log4j.ConsoleAppender">
+        <param name="Target" value="System.out"/>
+        <layout class="org.apache.log4j.PatternLayout">
+            <param name="ConversionPattern" value="[%t] %-5p %c{1}.%M - %m%n"/>
+        </layout>
+    </appender>
+
+    <logger name="org.apache.camel.component.rabbitmq">
+        <level value="debug"/>
+    </logger>
+    <root>
+        <level value="warn"/>
+        <appender-ref ref="console"/>
+    </root>
+
+</log4j:configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/0e2498b1/components/pom.xml
----------------------------------------------------------------------
diff --git a/components/pom.xml b/components/pom.xml
index e6fd60d..3f5713d 100644
--- a/components/pom.xml
+++ b/components/pom.xml
@@ -137,6 +137,7 @@
     <module>camel-protobuf</module>
     <module>camel-quartz</module>
     <module>camel-quickfix</module>
+    <module>camel-rabbitmq</module>
     <module>camel-restlet</module>
     <module>camel-rmi</module>
     <module>camel-routebox</module>

Reply via email to