Repository: camel Updated Branches: refs/heads/master cbdfe050e -> a8dfb0359
CAMEL-7421 Adding Channel pooling in RabbitMQProducer Fix RabbitMQSpringIntTest Replace custom object pool by Commons Pool Fix Spring integration test again Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3ad50186 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3ad50186 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3ad50186 Branch: refs/heads/master Commit: 3ad501862aa11d16fe882780a5e0456c37b8c1b3 Parents: cbdfe05 Author: Gerald Quintana <gerald.quint...@zenika.com> Authored: Wed May 14 16:35:21 2014 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Thu Dec 4 07:29:09 2014 +0100 ---------------------------------------------------------------------- components/camel-rabbitmq/pom.xml | 214 +++++++-------- .../component/rabbitmq/RabbitMQProducer.java | 131 ++++++++-- .../rabbitmq/pool/PoolableChannelFactory.java | 59 +++++ .../rabbitmq/RabbitMQSpringIntTest.java | 257 ++++++++++--------- .../rabbitmq/RabbitMQSpringIntTest-context.xml | 7 +- 5 files changed, 410 insertions(+), 258 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/3ad50186/components/camel-rabbitmq/pom.xml ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/pom.xml b/components/camel-rabbitmq/pom.xml index 4c82a15..675a07a 100644 --- a/components/camel-rabbitmq/pom.xml +++ b/components/camel-rabbitmq/pom.xml @@ -1,107 +1,107 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.camel</groupId> - <artifactId>components</artifactId> - <version>2.15-SNAPSHOT</version> - </parent> - - <artifactId>camel-rabbitmq</artifactId> - <packaging>bundle</packaging> - <name>Camel :: RabbitMQ</name> - <description>Camel RabbitMQ Component</description> - - <properties> - <camel.osgi.export.pkg> - org.apache.camel.component.rabbitmq.* - </camel.osgi.export.pkg> - <camel.osgi.export.service>org.apache.camel.spi.ComponentResolver;component=rabbitmq</camel.osgi.export.service> - </properties> - - <dependencies> - <dependency> - <groupId>com.rabbitmq</groupId> - <artifactId>amqp-client</artifactId> - <version>${rabbitmq-amqp-client-version}</version> - </dependency> - <dependency> - <groupId>org.apache.camel</groupId> - <artifactId>camel-core</artifactId> - </dependency> - - <!-- testing --> - <dependency> - <groupId>org.apache.camel</groupId> - <artifactId>camel-test-spring</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.camel</groupId> - <artifactId>camel-core-xml</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.mockito</groupId> - <artifactId>mockito-core</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - <scope>test</scope> - </dependency> - </dependencies> - - <build> - <plugins> - <plugin> - <artifactId>maven-surefire-plugin</artifactId> - <configuration> - <excludes> - <exclude>**/*IntTest*</exclude> - </excludes> - </configuration> - </plugin> - </plugins> - </build> - - <profiles> - <profile> - <id>itest</id> - <build> - <plugins> - <plugin> - <artifactId>maven-surefire-plugin</artifactId> - <configuration> - <excludes> - <exclude>None</exclude> - </excludes> - <includes> - <include>**/*IntTest*</include> - </includes> - </configuration> - </plugin> - </plugins> - </build> - </profile> - </profiles> - -</project> +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.camel</groupId> + <artifactId>components</artifactId> + <version>2.15-SNAPSHOT</version> + </parent> + + <artifactId>camel-rabbitmq</artifactId> + <packaging>bundle</packaging> + <name>Camel :: RabbitMQ</name> + <description>Camel RabbitMQ Component</description> + + <properties> + <camel.osgi.export.pkg> + org.apache.camel.component.rabbitmq.* + </camel.osgi.export.pkg> + <camel.osgi.export.service>org.apache.camel.spi.ComponentResolver;component=rabbitmq</camel.osgi.export.service> + </properties> + + <dependencies> + <dependency> + <groupId>com.rabbitmq</groupId> + <artifactId>amqp-client</artifactId> + <version>${rabbitmq-amqp-client-version}</version> + </dependency> + <dependency> + <groupId>commons-pool</groupId> + <artifactId>commons-pool</artifactId> + <version>${commons-pool-version}</version> + </dependency> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-core</artifactId> + </dependency> + + <!-- testing --> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-test-spring</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <excludes> + <exclude>**/*IntTest*</exclude> + </excludes> + </configuration> + </plugin> + </plugins> + </build> + + <profiles> + <profile> + <id>itest</id> + <build> + <plugins> + <plugin> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <excludes> + <exclude>**/*.xml</exclude> + </excludes> + <includes> + <include>**/*IntTest*</include> + </includes> + </configuration> + </plugin> + </plugins> + </build> + </profile> + </profiles> + +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/3ad50186/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java index f5c7eb4..755fa93 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java @@ -16,13 +16,6 @@ */ package org.apache.camel.component.rabbitmq; -import java.io.IOException; -import java.math.BigDecimal; -import java.util.Date; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ExecutorService; - import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; @@ -30,13 +23,30 @@ import org.apache.camel.Exchange; import org.apache.camel.impl.DefaultProducer; import org.apache.camel.util.ObjectHelper; +import java.io.IOException; +import java.math.BigDecimal; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import org.apache.camel.component.rabbitmq.pool.PoolableChannelFactory; +import org.apache.commons.pool.ObjectPool; +import org.apache.commons.pool.impl.GenericObjectPool; + public class RabbitMQProducer extends DefaultProducer { private int closeTimeout = 30 * 1000; private Connection conn; - private Channel channel; + /** + * Maximum number of opened channel in pool + */ + private int channelPoolMaxSize = 10; + /** + * Maximum time (in milliseconds) waiting for channel + */ + private long channelPoolMaxWait = 1000; + private ObjectPool<Channel> channelPool; private ExecutorService executorService; - public RabbitMQProducer(RabbitMQEndpoint endpoint) throws IOException { super(endpoint); } @@ -46,19 +56,41 @@ public class RabbitMQProducer extends DefaultProducer { return (RabbitMQEndpoint) super.getEndpoint(); } /** - * Open connection and channel + * Channel callback (similar to Spring JDBC ConnectionCallback) + */ + private static interface ChannelCallback<T> { + public T doWithChannel(Channel channel) throws Exception; + } + /** + * Do something with a pooled channel (similar to Spring JDBC TransactionTemplate#execute) + */ + private <T> T execute(ChannelCallback<T> callback) throws Exception { + Channel channel = channelPool.borrowObject(); + try { + return callback.doWithChannel(channel); + } finally { + channelPool.returnObject(channel); + } + } + /** + * Open connection and initialize channel pool */ - private void openConnectionAndChannel() throws IOException { + private void openConnectionAndChannelPool() throws Exception { log.trace("Creating connection..."); this.conn = getEndpoint().connect(executorService); log.debug("Created connection: {}", conn); - log.trace("Creating channel..."); - this.channel = conn.createChannel(); - log.debug("Created channel: {}", channel); + log.trace("Creating channel pool..."); + channelPool = new GenericObjectPool<>(new PoolableChannelFactory(this.conn), getChannelPoolMaxSize(), GenericObjectPool.WHEN_EXHAUSTED_BLOCK, getChannelPoolMaxWait()); if (getEndpoint().isDeclare()) { - getEndpoint().declareExchangeAndQueue(this.channel); - } + execute(new ChannelCallback<Void>() { + @Override + public Void doWithChannel(Channel channel) throws Exception { + getEndpoint().declareExchangeAndQueue(channel); + return null; + } + }); + } } @Override @@ -66,7 +98,7 @@ public class RabbitMQProducer extends DefaultProducer { this.executorService = getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadExecutor(this, "CamelRabbitMQProducer[" + getEndpoint().getQueue() + "]"); try { - openConnectionAndChannel(); + openConnectionAndChannelPool(); } catch (IOException e) { log.warn("Failed to create connection", e); } @@ -75,12 +107,8 @@ public class RabbitMQProducer extends DefaultProducer { /** * If needed, close Connection and Channel */ - private void closeConnectionAndChannel() throws IOException { - if (channel != null) { - log.debug("Closing channel: {}", channel); - channel.close(); - channel = null; - } + private void closeConnectionAndChannel() throws Exception { + channelPool.close(); if (conn != null) { log.debug("Closing connection: {} with timeout: {} ms.", conn, closeTimeout); conn.close(closeTimeout); @@ -113,13 +141,30 @@ public class RabbitMQProducer extends DefaultProducer { throw new IllegalArgumentException("ExchangeName and RoutingKey is not provided in the endpoint: " + getEndpoint()); } byte[] messageBodyBytes = exchange.getIn().getMandatoryBody(byte[].class); - AMQP.BasicProperties.Builder properties = buildProperties(exchange); + AMQP.BasicProperties properties = buildProperties(exchange).build(); - if (channel == null) { + basicPublish(exchangeName, key, properties, messageBodyBytes); + } + + /** + * Send a message borrowing a channel from the pool + * @param exchange Target exchange + * @param routingKey Routing key + * @param properties Header properties + * @param body Body content + */ + private void basicPublish(final String exchange, final String routingKey, final AMQP.BasicProperties properties, final byte[] body) throws Exception { + if (channelPool==null) { // Open connection and channel lazily - openConnectionAndChannel(); + openConnectionAndChannelPool(); } - channel.basicPublish(exchangeName, key, properties.build(), messageBodyBytes); + execute(new ChannelCallback<Void>() { + @Override + public Void doWithChannel(Channel channel) throws Exception { + channel.basicPublish(exchange, routingKey, properties, body); + return null; + } + }); } AMQP.BasicProperties.Builder buildProperties(Exchange exchange) { @@ -246,4 +291,36 @@ public class RabbitMQProducer extends DefaultProducer { public void setCloseTimeout(int closeTimeout) { this.closeTimeout = closeTimeout; } + + /** + * Get maximum number of opened channel in pool + * @return Maximum number of opened channel in pool + */ + public int getChannelPoolMaxSize() { + return channelPoolMaxSize; + } + + /** + * Set maximum number of opened channel in pool + * @param channelPoolMaxSize Maximum number of opened channel in pool + */ + public void setChannelPoolMaxSize(int channelPoolMaxSize) { + this.channelPoolMaxSize = channelPoolMaxSize; + } + + /** + * Get the maximum number of milliseconds to wait for a channel from the pool + * @return Maximum number of milliseconds waiting for a channel + */ + public long getChannelPoolMaxWait() { + return channelPoolMaxWait; + } + + /** + * Set the maximum number of milliseconds to wait for a channel from the pool + * @param channelPoolMaxWait Maximum number of milliseconds waiting for a channel + */ + public void setChannelPoolMaxWait(long channelPoolMaxWait) { + this.channelPoolMaxWait = channelPoolMaxWait; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/3ad50186/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/pool/PoolableChannelFactory.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/pool/PoolableChannelFactory.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/pool/PoolableChannelFactory.java new file mode 100644 index 0000000..b9bed13 --- /dev/null +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/pool/PoolableChannelFactory.java @@ -0,0 +1,59 @@ +/* + * Copyright 2014 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.rabbitmq.pool; + +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import org.apache.commons.pool.PoolableObjectFactory; + +/** + * Channel lifecyle manager: create, check and close channel + */ +public class PoolableChannelFactory implements PoolableObjectFactory<Channel> { + /** + * Parent connection + */ + private final Connection connection; + + public PoolableChannelFactory(Connection connection) { + this.connection = connection; + } + + @Override + public Channel makeObject() throws Exception { + return connection.createChannel(); + } + + @Override + public void destroyObject(Channel t) throws Exception { + t.close(); + } + + @Override + public boolean validateObject(Channel t) { + return t.isOpen(); + } + + @Override + public void activateObject(Channel t) throws Exception { + } + + @Override + public void passivateObject(Channel t) throws Exception { + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/3ad50186/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest.java index 6119082..f65b909 100644 --- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest.java +++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest.java @@ -1,122 +1,135 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.rabbitmq; - -import java.io.IOException; - -import com.rabbitmq.client.AMQP; -import com.rabbitmq.client.Channel; -import com.rabbitmq.client.Connection; -import com.rabbitmq.client.ConnectionFactory; -import com.rabbitmq.client.DefaultConsumer; -import com.rabbitmq.client.Envelope; - -import org.apache.camel.Produce; -import org.apache.camel.ProducerTemplate; -import org.apache.camel.test.spring.CamelSpringJUnit4ClassRunner; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.test.context.ContextConfiguration; -import static org.junit.Assert.assertEquals; -/** - * Test RabbitMQ component with Spring DSL - */ -@RunWith(CamelSpringJUnit4ClassRunner.class) -@ContextConfiguration("RabbitMQSpringIntTest-context.xml") -public class RabbitMQSpringIntTest { - @Produce(uri = "direct:rabbitMQ") - protected ProducerTemplate template; - @Autowired - private ConnectionFactory connectionFactory; - private Connection connection; - private Channel channel; - - private Connection openConnection() throws IOException { - if (connection == null) { - connection = connectionFactory.newConnection(); - } - return connection; - } - - private Channel openChannel() throws IOException { - if (channel == null) { - channel = openConnection().createChannel(); - } - return channel; - } - - @Before - public void bindQueueExchange() throws IOException { - openChannel(); - channel.exchangeDeclare("ex2", "direct", true, false, null); - channel.queueDeclare("q2", true, false, false, null); - channel.queueBind("q2", "ex2", "rk2"); - } - - @After - public void closeConnection() { - if (channel != null) { - try { - channel.close(); - } catch (IOException e) { - } - } - if (connection != null) { - try { - connection.close(); - } catch (IOException e) { - } - } - } - - private static final class LastDeliveryConsumer extends DefaultConsumer { - private byte[] lastBody; - - private LastDeliveryConsumer(Channel channel) { - super(channel); - } - - @Override - public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { - lastBody = body; - super.handleDelivery(consumerTag, envelope, properties, body); - } - - public byte[] getLastBody() { - return lastBody; - } - } - - @Test - public void testSendCsutomConnectionFactory() throws Exception { - String body = "Hello Rabbit"; - template.sendBodyAndHeader(body, RabbitMQConstants.ROUTING_KEY, "rk2"); - - openChannel(); - LastDeliveryConsumer consumer = new LastDeliveryConsumer(channel); - channel.basicConsume("q2", true, consumer); - int i = 10; - while (consumer.getLastBody() == null && i > 0) { - Thread.sleep(1000L); - i--; - } - assertEquals(body, new String(consumer.getLastBody())); - } -} +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.rabbitmq; + +import com.rabbitmq.client.*; +import org.apache.camel.Produce; +import org.apache.camel.ProducerTemplate; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; + +import java.io.IOException; +import java.util.HashMap; + +import static org.junit.Assert.assertEquals; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +/** + * Test RabbitMQ component with Spring DSL + */ +@RunWith(SpringJUnit4ClassRunner.class) +@ContextConfiguration +public class RabbitMQSpringIntTest { + private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQSpringIntTest.class); + @Produce(uri = "direct:rabbitMQ") + protected ProducerTemplate template; + @Autowired + private ConnectionFactory connectionFactory; + private Connection connection; + private Channel channel; + private boolean isConnectionOpened() { + return connection!=null && connection.isOpen(); + } + private Connection openConnection() throws IOException { + if (!isConnectionOpened()) { + LOGGER.info("Open connection"); + connection = connectionFactory.newConnection(); + } + return connection; + } + private boolean isChannelOpened() { + return channel != null && channel.isOpen(); + } + private Channel openChannel() throws IOException { + if (!isChannelOpened()) { + LOGGER.info("Open channel"); + channel = openConnection().createChannel(); + } + return channel; + } + + @Before + public void bindQueueExchange() throws IOException { + openChannel(); + /* + LOGGER.info("Declare exchange queue"); + channel.exchangeDeclare("ex2", "direct", true, false, new HashMap<String, Object>()); + channel.queueDeclare("q2", true, false, false, null); + channel.queueBind("q2", "ex2", "rk2"); + */ + } + + @After + public void closeConnection() { + if (isChannelOpened()) { + try { + LOGGER.info("Close channel"); + channel.close(); + } catch (IOException e) { + } + } + if (isConnectionOpened()) { + try { + LOGGER.info("Close connection"); + connection.close(); + } catch (IOException e) { + } + } + } + + private static final class LastDeliveryConsumer extends DefaultConsumer { + private byte[] lastBody; + + private LastDeliveryConsumer(Channel channel) { + super(channel); + } + + @Override + public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { + lastBody = body; + super.handleDelivery(consumerTag, envelope, properties, body); + } + + public byte[] getLastBody() { + return lastBody; + } + public String getLastBodyAsString() { + return lastBody == null ? null : new String(lastBody); + } + } + + @Test + public void testSendCsutomConnectionFactory() throws Exception { + String body = "Hello Rabbit"; + template.sendBodyAndHeader(body, RabbitMQConstants.ROUTING_KEY, "rk2"); + + openChannel(); + LastDeliveryConsumer consumer = new LastDeliveryConsumer(channel); + channel.basicConsume("q2", true, consumer); + int i = 10; + while (consumer.getLastBody() == null && i > 0) { + Thread.sleep(1000L); + i--; + } + assertEquals(body, consumer.getLastBodyAsString()); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/3ad50186/components/camel-rabbitmq/src/test/resources/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest-context.xml ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/test/resources/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest-context.xml b/components/camel-rabbitmq/src/test/resources/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest-context.xml index 6810583..b4688c7 100644 --- a/components/camel-rabbitmq/src/test/resources/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest-context.xml +++ b/components/camel-rabbitmq/src/test/resources/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest-context.xml @@ -21,7 +21,10 @@ http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd "> - + <!-- To create and grant user cameltest: + rabbitmqctl add_user cameltest cameltest + rabbitmqctl set_permissions -p / cameltest ".*" ".*" ".*" + --> <!-- START SNIPPET: custom connection factory --> <bean id="customConnectionFactory" class="com.rabbitmq.client.ConnectionFactory"> <property name="host" value="localhost"/> @@ -33,7 +36,7 @@ <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="direct:rabbitMQ"/> - <to uri="rabbitmq://localhost:5672/ex2?connectionFactory=#customConnectionFactory&queue=q2"/> + <to uri="rabbitmq://localhost:5672/ex2?connectionFactory=#customConnectionFactory&queue=q2&routingKey=rk2"/> </route> </camelContext> <!-- END SNIPPET: example -->