CAMEL-6869 merged the patch of Gerald
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/686d5fcc Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/686d5fcc Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/686d5fcc Branch: refs/heads/master Commit: 686d5fccac3e0308026548b58dfc90c31e41b5c9 Parents: 69b00a3 8029d0c Author: Willem Jiang <willem.ji...@gmail.com> Authored: Fri Jun 6 10:58:16 2014 +0800 Committer: Willem Jiang <willem.ji...@gmail.com> Committed: Fri Jun 6 10:58:16 2014 +0800 ---------------------------------------------------------------------- components/camel-rabbitmq/pom.xml | 17 ++- .../component/rabbitmq/RabbitMQConsumer.java | 114 +++++++++++++---- .../component/rabbitmq/RabbitMQEndpoint.java | 36 ++++-- .../component/rabbitmq/RabbitMQProducer.java | 35 +++++- .../rabbitmq/RabbitMQEndpointTest.java | 36 +++--- .../rabbitmq/RabbitMQReConnectionIntTest.java | 102 ++++++++++++++++ .../rabbitmq/RabbitMQSpringIntTest.java | 122 +++++++++++++++++++ .../rabbitmq/RabbitMQSpringIntTest-context.xml | 40 ++++++ 8 files changed, 443 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/686d5fcc/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java ---------------------------------------------------------------------- diff --cc components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java index 8a9b624,18ed62f..0f1d85f --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java @@@ -43,42 -48,56 +48,61 @@@ public class RabbitMQConsumer extends D } @Override - protected void doStart() throws Exception { - executor = endpoint.createExecutor(); - log.debug("Using executor {}", executor); - - conn = endpoint.connect(executor); - log.debug("Using conn {}", conn); + - channel = conn.createChannel(); - log.debug("Using channel {}", channel); + public RabbitMQEndpoint getEndpoint() { + return (RabbitMQEndpoint) super.getEndpoint(); + } + /** + * Open connection and channel + */ + private void openConnectionAndChannel() throws IOException { + log.trace("Creating connection..."); + this.conn = getEndpoint().connect(executor); + log.debug("Created connection: {}", conn); + + log.trace("Creating channel..."); + this.channel = conn.createChannel(); + log.debug("Created channel: {}", channel); - - getEndpoint().declareExchangeAndQueue(channel); - } ++ // setup the basicQos + if (endpoint.isPrefetchEnabled()) { + channel.basicQos(endpoint.getPrefetchSize(), + endpoint.getPrefetchCount(), endpoint.isPrefetchGlobal()); + } - - channel.exchangeDeclare(endpoint.getExchangeName(), - endpoint.getExchangeType(), - endpoint.isDurable(), - endpoint.isAutoDelete(), - new HashMap<String, Object>()); - - // need to make sure the queueDeclare is same with the exchange declare - channel.queueDeclare(endpoint.getQueue(), endpoint.isDurable(), false, - endpoint.isAutoDelete(), null); - channel.queueBind( - endpoint.getQueue(), - endpoint.getExchangeName(), - endpoint.getRoutingKey() == null ? "" : endpoint - .getRoutingKey()); ++ getEndpoint().declareExchangeAndQueue(channel); ++ } + /** + * If needed, create Exchange and Queue, then add message listener + */ + private void addConsumer() throws IOException { channel.basicConsume(endpoint.getQueue(), endpoint.isAutoAck(), new RabbitConsumer(this, channel)); } @Override - protected void doStop() throws Exception { + protected void doStart() throws Exception { + executor = endpoint.createExecutor(); + log.debug("Using executor {}", executor); + try { + openConnectionAndChannel(); + addConsumer(); + } catch (Exception e) { + // Open connection, and start message listener in background + Integer networkRecoveryInterval = getEndpoint().getNetworkRecoveryInterval(); - final long connectionRetryInterval= networkRecoveryInterval!=null && networkRecoveryInterval >0? networkRecoveryInterval :100L; - startConsumerCallable=new StartConsumerCallable(connectionRetryInterval); ++ final long connectionRetryInterval = networkRecoveryInterval != null && networkRecoveryInterval > 0 ? networkRecoveryInterval : 100L; ++ startConsumerCallable = new StartConsumerCallable(connectionRetryInterval); + executor.submit(startConsumerCallable); + } + } + + /** + * If needed, close Connection and Channel + */ + private void closeConnectionAndChannel() throws IOException { - if (startConsumerCallable!=null) { ++ if (startConsumerCallable != null) { + startConsumerCallable.stop(); + } if (channel != null) { log.debug("Closing channel: {}", channel); channel.close(); @@@ -183,4 -208,38 +213,38 @@@ } + /** + * Task in charge of opening connection and adding listener when consumer is started - * and broker is not avaiblable. ++ * and broker is not available. + */ + private class StartConsumerCallable implements Callable<Void> { + private final long connectionRetryInterval; - private final AtomicBoolean running=new AtomicBoolean(true); ++ private final AtomicBoolean running = new AtomicBoolean(true); + public StartConsumerCallable(long connectionRetryInterval) { + this.connectionRetryInterval = connectionRetryInterval; + } + public void stop() { + running.set(false); - RabbitMQConsumer.this.startConsumerCallable=null; ++ RabbitMQConsumer.this.startConsumerCallable = null; + } + @Override + public Void call() throws Exception { - boolean connectionFailed=true; ++ boolean connectionFailed = true; + // Reconnection loop + while (running.get() && connectionFailed) { + try { + openConnectionAndChannel(); - connectionFailed=false; ++ connectionFailed = false; + } catch (Exception e) { - log.debug("Connection failed, will retry in "+connectionRetryInterval+"ms", e); ++ log.debug("Connection failed, will retry in " + connectionRetryInterval + "ms", e); + Thread.sleep(connectionRetryInterval); + } + } + if (!connectionFailed) { + addConsumer(); + } + stop(); + return null; + } + } } http://git-wip-us.apache.org/repos/asf/camel/blob/686d5fcc/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java ---------------------------------------------------------------------- diff --cc components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java index 9400935,7f87f53..e475819 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java @@@ -71,15 -73,7 +73,15 @@@ public class RabbitMQEndpoint extends D private Boolean automaticRecoveryEnabled; private Integer networkRecoveryInterval; private Boolean topologyRecoveryEnabled; - + + //If it is true, prefetchSize, prefetchCount, prefetchGlobal will be used for basicOqs before starting RabbitMQConsumer + private boolean prefetchEnabled; + //Default in RabbitMq is 0. + private int prefetchSize; + private int prefetchCount; + //Default value in RabbitMQ is false. + private boolean prefetchGlobal; - ++ public RabbitMQEndpoint() { } @@@ -134,7 -128,27 +136,27 @@@ } } - protected ConnectionFactory getOrCreateConnectionFactory() { - /** ++ /** + * If needed, declare Exchange, declare Queue and bind them with Routing Key + */ + public void declareExchangeAndQueue(Channel channel) throws IOException { + channel.exchangeDeclare(getExchangeName(), + getExchangeType(), + isDurable(), + isAutoDelete(), + new HashMap<String, Object>()); - if (getQueue()!=null) { ++ if (getQueue() != null) { + // need to make sure the queueDeclare is same with the exchange declare + channel.queueDeclare(getQueue(), isDurable(), false, + isAutoDelete(), null); + channel.queueBind( + getQueue(), + getExchangeName(), + getRoutingKey() == null ? "" : getRoutingKey()); + } + } + + private ConnectionFactory getOrCreateConnectionFactory() { if (connectionFactory == null) { ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(getUsername()); @@@ -299,22 -313,22 +321,22 @@@ public void setRoutingKey(String routingKey) { this.routingKey = routingKey; } -- ++ public void setBridgeEndpoint(boolean bridgeEndpoint) { this.bridgeEndpoint = bridgeEndpoint; } -- ++ public boolean isBridgeEndpoint() { return bridgeEndpoint; } -- ++ public void setAddresses(String addresses) { Address[] addressArray = Address.parseAddresses(addresses); if (addressArray.length > 0) { this.addresses = addressArray; } } -- ++ public Address[] getAddresses() { return addresses; } http://git-wip-us.apache.org/repos/asf/camel/blob/686d5fcc/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java ---------------------------------------------------------------------- diff --cc components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java index 7763423,f8596c9..2e22d3f --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java @@@ -57,6 -56,8 +56,8 @@@ public class RabbitMQProducer extends D log.trace("Creating channel..."); this.channel = conn.createChannel(); log.debug("Created channel: {}", channel); + - getEndpoint().declareExchangeAndQueue(this.channel); ++ getEndpoint().declareExchangeAndQueue(this.channel); } @Override @@@ -95,6 -114,10 +114,10 @@@ byte[] messageBodyBytes = exchange.getIn().getMandatoryBody(byte[].class); AMQP.BasicProperties.Builder properties = buildProperties(exchange); - if (channel==null) { ++ if (channel == null) { + // Open connection and channel lazily + openConnectionAndChannel(); + } channel.basicPublish(exchangeName, key, properties.build(), messageBodyBytes); } http://git-wip-us.apache.org/repos/asf/camel/blob/686d5fcc/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java ---------------------------------------------------------------------- diff --cc components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java index 86a7bcc,2db5005..afae40d --- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java +++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java @@@ -109,7 -109,7 +109,7 @@@ public class RabbitMQEndpointTest exten ThreadPoolExecutor executor = assertIsInstanceOf(ThreadPoolExecutor.class, endpoint.createExecutor()); assertEquals(20, executor.getCorePoolSize()); } -- ++ @Test public void createEndpointWithAutoAckDisabled() throws Exception { RabbitMQEndpoint endpoint = context.getEndpoint("rabbitmq:localhost/exchange?autoAck=false", RabbitMQEndpoint.class); @@@ -122,7 -122,7 +122,7 @@@ assertTrue(endpoint.isSingleton()); } -- ++ @Test public void brokerEndpointAddressesSettings() throws Exception { RabbitMQEndpoint endpoint = context.getEndpoint("rabbitmq:localhost/exchange?addresses=server1:12345,server2:12345", RabbitMQEndpoint.class); @@@ -133,7 -133,13 +133,13 @@@ private ConnectionFactory createConnectionFactory(String uri) { RabbitMQEndpoint endpoint = context.getEndpoint(uri, RabbitMQEndpoint.class); - return endpoint.getOrCreateConnectionFactory(); - try { - endpoint.connect(Executors.newSingleThreadExecutor()); - } catch(IOException ioExc) { - // Doesn't matter if RabbitMQ is not available - log.debug("RabbitMQ not available", ioExc); - } ++ try { ++ endpoint.connect(Executors.newSingleThreadExecutor()); ++ } catch (IOException ioExc) { ++ // Doesn't matter if RabbitMQ is not available ++ log.debug("RabbitMQ not available", ioExc); ++ } + return endpoint.getConnectionFactory(); } @Test @@@ -157,17 -163,17 +163,17 @@@ @Test public void testCreateConnectionFactoryCustom() throws Exception { -- ConnectionFactory connectionFactory = createConnectionFactory("rabbitmq:localhost:1234/exchange" - + "?username=userxxx" - + "&password=passxxx" - + "&connectionTimeout=123" - + "&requestedChannelMax=456" - + "&requestedFrameMax=789" - + "&requestedHeartbeat=987" - + "&sslProtocol=true" - + "&automaticRecoveryEnabled=true" - + "&networkRecoveryInterval=654" - + "&topologyRecoveryEnabled=false"); - + "?username=userxxx" - + "&password=passxxx" - + "&connectionTimeout=123" - + "&requestedChannelMax=456" - + "&requestedFrameMax=789" - + "&requestedHeartbeat=987" - + "&sslProtocol=true" - + "&automaticRecoveryEnabled=true" - + "&networkRecoveryInterval=654" - + "&topologyRecoveryEnabled=false"); ++ ConnectionFactory connectionFactory = createConnectionFactory("rabbitmq:localhost:1234/exchange" ++ + "?username=userxxx" ++ + "&password=passxxx" ++ + "&connectionTimeout=123" ++ + "&requestedChannelMax=456" ++ + "&requestedFrameMax=789" ++ + "&requestedHeartbeat=987" ++ + "&sslProtocol=true" ++ + "&automaticRecoveryEnabled=true" ++ + "&networkRecoveryInterval=654" ++ + "&topologyRecoveryEnabled=false"); assertEquals("localhost", connectionFactory.getHost()); assertEquals(1234, connectionFactory.getPort()); http://git-wip-us.apache.org/repos/asf/camel/blob/686d5fcc/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQReConnectionIntTest.java ---------------------------------------------------------------------- diff --cc components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQReConnectionIntTest.java index 0000000,512aed4..302440c mode 000000,100644..100644 --- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQReConnectionIntTest.java +++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQReConnectionIntTest.java @@@ -1,0 -1,83 +1,102 @@@ ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one or more ++ * contributor license agreements. See the NOTICE file distributed with ++ * this work for additional information regarding copyright ownership. ++ * The ASF licenses this file to You under the Apache License, Version 2.0 ++ * (the "License"); you may not use this file except in compliance with ++ * the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ + package org.apache.camel.component.rabbitmq; + ++import java.net.ConnectException; ++import java.util.concurrent.TimeUnit; ++ + import com.rabbitmq.client.AlreadyClosedException; -import org.apache.camel.*; ++ ++import org.apache.camel.CamelExecutionException; ++import org.apache.camel.Endpoint; ++import org.apache.camel.EndpointInject; ++import org.apache.camel.Produce; ++import org.apache.camel.ProducerTemplate; + import org.apache.camel.builder.RouteBuilder; + import org.apache.camel.component.mock.MockEndpoint; + import org.apache.camel.test.junit4.CamelTestSupport; + import org.junit.Test; + -import java.net.ConnectException; -import java.util.concurrent.TimeUnit; - + /** + * Integration test to check that RabbitMQ Endpoint is able to reconnect to broker when broker + * is not avaibable. + * <ul> + * <li>Stop the broker</li> + * <li>Run the test: the producer complains it can not send messages, the consumer is silent</li> + * <li>Start the broker: the producer sends messages, and the consumer receives messages</li> + * <li>Stop the broker: the producer complains it can not send messages, the consumer is silent</li> + * <li>Start the broker: the producer sends messages, and the consumer receives messages</li> + * </ul> + */ + public class RabbitMQReConnectionIntTest extends CamelTestSupport { + private static final String EXCHANGE = "ex3"; + + @Produce(uri = "direct:rabbitMQ") + protected ProducerTemplate directProducer; + - @EndpointInject(uri = "rabbitmq:localhost:5672/" + EXCHANGE + "?username=cameltest&password=cameltest" + - "&queue=q3&routingKey=rk3" + - "&automaticRecoveryEnabled=true" + - "&requestedHeartbeat=1000" + - "&connectionTimeout=5000") ++ @EndpointInject(uri = "rabbitmq:localhost:5672/" + EXCHANGE + "?username=cameltest&password=cameltest" ++ + "&queue=q3&routingKey=rk3" + "&automaticRecoveryEnabled=true" ++ + "&requestedHeartbeat=1000" + "&connectionTimeout=5000") + private Endpoint rabbitMQEndpoint; + + @EndpointInject(uri = "mock:producing") + private MockEndpoint producingMockEndpoint; + + @EndpointInject(uri = "mock:consuming") + private MockEndpoint consumingMockEndpoint; + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + + @Override + public void configure() throws Exception { + from("direct:rabbitMQ") + .id("producingRoute") + .onException(AlreadyClosedException.class, ConnectException.class) + .maximumRedeliveries(10) + .redeliveryDelay(500L) + .end() + .log("Sending message") + .inOnly(rabbitMQEndpoint) + .to(producingMockEndpoint); + from(rabbitMQEndpoint) + .id("consumingRoute") + .log("Receiving message") + .to(consumingMockEndpoint); + } + }; + } + + @Test + public void testSendEndReceive() throws Exception { + int nbMessages = 50; + int failedMessages = 0; + for (int i = 0; i < nbMessages; i++) { + try { + directProducer.sendBodyAndHeader("Message #" + i, RabbitMQConstants.ROUTING_KEY, "rk3"); + } catch (CamelExecutionException e) { + log.debug("Can not send message", e); + failedMessages++; + } + Thread.sleep(500L); + } + producingMockEndpoint.expectedMessageCount(nbMessages - failedMessages); + consumingMockEndpoint.expectedMessageCount(nbMessages - failedMessages); + assertMockEndpointsSatisfied(5, TimeUnit.SECONDS); + } + } http://git-wip-us.apache.org/repos/asf/camel/blob/686d5fcc/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest.java ---------------------------------------------------------------------- diff --cc components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest.java index 0000000,baa4bde..6119082 mode 000000,100644..100644 --- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest.java +++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest.java @@@ -1,0 -1,102 +1,122 @@@ ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one or more ++ * contributor license agreements. See the NOTICE file distributed with ++ * this work for additional information regarding copyright ownership. ++ * The ASF licenses this file to You under the Apache License, Version 2.0 ++ * (the "License"); you may not use this file except in compliance with ++ * the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ + package org.apache.camel.component.rabbitmq; + -import com.rabbitmq.client.*; ++import java.io.IOException; ++ ++import com.rabbitmq.client.AMQP; ++import com.rabbitmq.client.Channel; ++import com.rabbitmq.client.Connection; ++import com.rabbitmq.client.ConnectionFactory; ++import com.rabbitmq.client.DefaultConsumer; ++import com.rabbitmq.client.Envelope; ++ + import org.apache.camel.Produce; + import org.apache.camel.ProducerTemplate; + import org.apache.camel.test.spring.CamelSpringJUnit4ClassRunner; + import org.junit.After; + import org.junit.Before; + import org.junit.Test; + import org.junit.runner.RunWith; + import org.springframework.beans.factory.annotation.Autowired; + import org.springframework.test.context.ContextConfiguration; - -import java.io.IOException; - + import static org.junit.Assert.assertEquals; - + /** + * Test RabbitMQ component with Spring DSL + */ + @RunWith(CamelSpringJUnit4ClassRunner.class) + @ContextConfiguration("RabbitMQSpringIntTest-context.xml") + public class RabbitMQSpringIntTest { + @Produce(uri = "direct:rabbitMQ") + protected ProducerTemplate template; + @Autowired + private ConnectionFactory connectionFactory; + private Connection connection; + private Channel channel; + + private Connection openConnection() throws IOException { + if (connection == null) { + connection = connectionFactory.newConnection(); + } + return connection; + } + + private Channel openChannel() throws IOException { + if (channel == null) { + channel = openConnection().createChannel(); + } + return channel; + } + + @Before + public void bindQueueExchange() throws IOException { + openChannel(); + channel.exchangeDeclare("ex2", "direct", true, false, null); + channel.queueDeclare("q2", true, false, false, null); + channel.queueBind("q2", "ex2", "rk2"); + } + + @After + public void closeConnection() { + if (channel != null) { + try { + channel.close(); + } catch (IOException e) { + } + } + if (connection != null) { + try { + connection.close(); + } catch (IOException e) { + } + } + } + - private static class LastDeliveryConsumer extends DefaultConsumer { ++ private static final class LastDeliveryConsumer extends DefaultConsumer { + private byte[] lastBody; + + private LastDeliveryConsumer(Channel channel) { + super(channel); + } + + @Override + public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { + lastBody = body; + super.handleDelivery(consumerTag, envelope, properties, body); + } + + public byte[] getLastBody() { + return lastBody; + } + } + + @Test + public void testSendCsutomConnectionFactory() throws Exception { + String body = "Hello Rabbit"; + template.sendBodyAndHeader(body, RabbitMQConstants.ROUTING_KEY, "rk2"); + + openChannel(); + LastDeliveryConsumer consumer = new LastDeliveryConsumer(channel); + channel.basicConsume("q2", true, consumer); + int i = 10; + while (consumer.getLastBody() == null && i > 0) { + Thread.sleep(1000L); + i--; + } + assertEquals(body, new String(consumer.getLastBody())); + } + }