Repository: camel Updated Branches: refs/heads/master 913d589b6 -> a0b500a4f
CAMEL-7384 Allow connection factory tuning with thanks to Gérald Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a0b500a4 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a0b500a4 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a0b500a4 Branch: refs/heads/master Commit: a0b500a4f2e138f1a66e8b408df0f1b27433fa68 Parents: 913d589 Author: Willem Jiang <willem.ji...@gmail.com> Authored: Fri Apr 25 16:26:12 2014 +0800 Committer: Willem Jiang <willem.ji...@gmail.com> Committed: Fri Apr 25 16:26:12 2014 +0800 ---------------------------------------------------------------------- .../component/rabbitmq/RabbitMQComponent.java | 19 +- .../component/rabbitmq/RabbitMQEndpoint.java | 174 +++++++++++++++++-- .../rabbitmq/RabbitMQComponentTest.java | 37 ++++ .../rabbitmq/RabbitMQEndpointTest.java | 56 ++++++ 4 files changed, 268 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/a0b500a4/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java index 56db13d..585a07f 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java @@ -19,6 +19,10 @@ package org.apache.camel.component.rabbitmq; import java.net.URI; import java.util.Map; +import javax.net.ssl.TrustManager; + +import com.rabbitmq.client.ConnectionFactory; + import org.apache.camel.CamelContext; import org.apache.camel.impl.DefaultComponent; import org.slf4j.Logger; @@ -44,11 +48,22 @@ public class RabbitMQComponent extends DefaultComponent { int portNumber = host.getPort(); String exchangeName = host.getPath().substring(1); - RabbitMQEndpoint endpoint = new RabbitMQEndpoint(uri, this); + // ConnectionFactory reference + ConnectionFactory connectionFactory = resolveAndRemoveReferenceParameter(params, "connectionFactory", ConnectionFactory.class); + @SuppressWarnings("unchecked") + Map<String, Object> clientProperties = resolveAndRemoveReferenceParameter(params, "clientProperties", Map.class); + TrustManager trustManager = resolveAndRemoveReferenceParameter(params, "trustManager", TrustManager.class); + RabbitMQEndpoint endpoint; + if (connectionFactory == null) { + endpoint = new RabbitMQEndpoint(uri, this); + } else { + endpoint = new RabbitMQEndpoint(uri, this, connectionFactory); + } endpoint.setHostname(hostname); endpoint.setPortNumber(portNumber); endpoint.setExchangeName(exchangeName); - + endpoint.setClientProperties(clientProperties); + endpoint.setTrustManager(trustManager); setProperties(endpoint, params); if (LOG.isDebugEnabled()) { http://git-wip-us.apache.org/repos/asf/camel/blob/a0b500a4/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java index 338c1c5..646a633 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java @@ -18,11 +18,15 @@ package org.apache.camel.component.rabbitmq; import java.io.IOException; import java.net.URISyntaxException; +import java.security.KeyManagementException; +import java.security.NoSuchAlgorithmException; import java.util.Map; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import javax.net.ssl.TrustManager; + import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Address; import com.rabbitmq.client.Connection; @@ -41,9 +45,9 @@ import org.apache.camel.impl.DefaultMessage; public class RabbitMQEndpoint extends DefaultEndpoint { - private String username; - private String password; - private String vhost; + private String username = ConnectionFactory.DEFAULT_USER; + private String password = ConnectionFactory.DEFAULT_PASS; + private String vhost = ConnectionFactory.DEFAULT_VHOST; private String hostname; private int threadPoolSize = 10; private int portNumber; @@ -56,7 +60,18 @@ public class RabbitMQEndpoint extends DefaultEndpoint { private String exchangeType = "direct"; private String routingKey; private Address[] addresses; - + private int connectionTimeout = ConnectionFactory.DEFAULT_CONNECTION_TIMEOUT; + private int requestedChannelMax = ConnectionFactory.DEFAULT_CHANNEL_MAX; + private int requestedFrameMax = ConnectionFactory.DEFAULT_FRAME_MAX; + private int requestedHeartbeat = ConnectionFactory.DEFAULT_HEARTBEAT; + private String sslProtocol; + private TrustManager trustManager; + private Map<String, Object> clientProperties; + private ConnectionFactory connectionFactory; + private Boolean automaticRecoveryEnabled; + private Integer networkRecoveryInterval; + private Boolean topologyRecoveryEnabled; + public RabbitMQEndpoint() { } @@ -64,6 +79,11 @@ public class RabbitMQEndpoint extends DefaultEndpoint { super(endpointUri, component); } + public RabbitMQEndpoint(String endpointUri, RabbitMQComponent component, ConnectionFactory connectionFactory) throws URISyntaxException { + super(endpointUri, component); + this.connectionFactory = connectionFactory; + } + public Exchange createRabbitExchange(Envelope envelope, AMQP.BasicProperties properties, byte[] body) { Exchange exchange = new DefaultExchange(getCamelContext(), getExchangePattern()); @@ -99,21 +119,55 @@ public class RabbitMQEndpoint extends DefaultEndpoint { } public Connection connect(ExecutorService executor) throws IOException { - ConnectionFactory factory = new ConnectionFactory(); - factory.setUsername(getUsername()); - factory.setPassword(getPassword()); - if (getVhost() == null) { - factory.setVirtualHost("/"); - } else { - factory.setVirtualHost(getVhost()); - } - factory.setHost(getHostname()); - factory.setPort(getPortNumber()); if (getAddresses() == null) { - return factory.newConnection(executor); + return getOrCreateConnectionFactory().newConnection(executor); } else { - return factory.newConnection(executor, getAddresses()); + return getOrCreateConnectionFactory().newConnection(executor, getAddresses()); + } + } + + private ConnectionFactory getOrCreateConnectionFactory() { + if (connectionFactory == null) { + ConnectionFactory factory = new ConnectionFactory(); + factory.setUsername(getUsername()); + factory.setPassword(getPassword()); + factory.setVirtualHost(getVhost()); + factory.setHost(getHostname()); + factory.setPort(getPortNumber()); + if (getClientProperties() != null) { + factory.setClientProperties(getClientProperties()); + } + factory.setConnectionTimeout(getConnectionTimeout()); + factory.setRequestedChannelMax(getRequestedChannelMax()); + factory.setRequestedFrameMax(getRequestedFrameMax()); + factory.setRequestedHeartbeat(getRequestedHeartbeat()); + if (getSslProtocol() != null) { + try { + if (getSslProtocol().equals("true")) { + factory.useSslProtocol(); + } else if (getTrustManager() == null) { + factory.useSslProtocol(getSslProtocol()); + } else { + factory.useSslProtocol(getSslProtocol(), getTrustManager()); + } + } catch (NoSuchAlgorithmException e) { + throw new IllegalArgumentException("Invalid sslProtocol " + sslProtocol, e); + } catch (KeyManagementException e) { + throw new IllegalArgumentException("Invalid sslProtocol " + sslProtocol, e); + } + } + if (getAutomaticRecoveryEnabled() != null) { + factory.setAutomaticRecoveryEnabled(getAutomaticRecoveryEnabled()); + } + if (getNetworkRecoveryInterval() != null) { + factory.setNetworkRecoveryInterval(getNetworkRecoveryInterval()); + } + if (getTopologyRecoveryEnabled() != null) { + factory.setTopologyRecoveryEnabled(getTopologyRecoveryEnabled()); + } + connectionFactory = factory; } + return connectionFactory; } @Override @@ -256,4 +310,92 @@ public class RabbitMQEndpoint extends DefaultEndpoint { public Address[] getAddresses() { return addresses; } + + public int getConnectionTimeout() { + return connectionTimeout; + } + + public void setConnectionTimeout(int connectionTimeout) { + this.connectionTimeout = connectionTimeout; + } + + public int getRequestedChannelMax() { + return requestedChannelMax; + } + + public void setRequestedChannelMax(int requestedChannelMax) { + this.requestedChannelMax = requestedChannelMax; + } + + public int getRequestedFrameMax() { + return requestedFrameMax; + } + + public void setRequestedFrameMax(int requestedFrameMax) { + this.requestedFrameMax = requestedFrameMax; + } + + public int getRequestedHeartbeat() { + return requestedHeartbeat; + } + + public void setRequestedHeartbeat(int requestedHeartbeat) { + this.requestedHeartbeat = requestedHeartbeat; + } + + public String getSslProtocol() { + return sslProtocol; + } + + public void setSslProtocol(String sslProtocol) { + this.sslProtocol = sslProtocol; + } + + public ConnectionFactory getConnectionFactory() { + return connectionFactory; + } + + public void setConnectionFactory(ConnectionFactory connectionFactory) { + this.connectionFactory = connectionFactory; + } + + public TrustManager getTrustManager() { + return trustManager; + } + + public void setTrustManager(TrustManager trustManager) { + this.trustManager = trustManager; + } + + public Map<String, Object> getClientProperties() { + return clientProperties; + } + + public void setClientProperties(Map<String, Object> clientProperties) { + this.clientProperties = clientProperties; + } + + public Boolean getAutomaticRecoveryEnabled() { + return automaticRecoveryEnabled; + } + + public void setAutomaticRecoveryEnabled(Boolean automaticRecoveryEnabled) { + this.automaticRecoveryEnabled = automaticRecoveryEnabled; + } + + public Integer getNetworkRecoveryInterval() { + return networkRecoveryInterval; + } + + public void setNetworkRecoveryInterval(Integer networkRecoveryInterval) { + this.networkRecoveryInterval = networkRecoveryInterval; + } + + public Boolean getTopologyRecoveryEnabled() { + return topologyRecoveryEnabled; + } + + public void setTopologyRecoveryEnabled(Boolean topologyRecoveryEnabled) { + this.topologyRecoveryEnabled = topologyRecoveryEnabled; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/a0b500a4/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQComponentTest.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQComponentTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQComponentTest.java index 454a26d..5506aab 100644 --- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQComponentTest.java +++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQComponentTest.java @@ -19,11 +19,17 @@ package org.apache.camel.component.rabbitmq; import java.util.HashMap; import java.util.Map; +import com.rabbitmq.client.ConnectionFactory; + import org.apache.camel.CamelContext; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.impl.SimpleRegistry; import org.junit.Test; import org.mockito.Mockito; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; public class RabbitMQComponentTest { @@ -39,6 +45,11 @@ public class RabbitMQComponentTest { assertEquals(true, endpoint.isAutoDelete()); assertEquals(true, endpoint.isDurable()); assertEquals("direct", endpoint.getExchangeType()); + assertEquals(ConnectionFactory.DEFAULT_CONNECTION_TIMEOUT, endpoint.getConnectionTimeout()); + assertEquals(ConnectionFactory.DEFAULT_CHANNEL_MAX, endpoint.getRequestedChannelMax()); + assertEquals(ConnectionFactory.DEFAULT_FRAME_MAX, endpoint.getRequestedFrameMax()); + assertEquals(ConnectionFactory.DEFAULT_HEARTBEAT, endpoint.getRequestedHeartbeat()); + assertNull(endpoint.getConnectionFactory()); } @Test @@ -53,6 +64,10 @@ public class RabbitMQComponentTest { params.put("hostname", "special.host"); params.put("queue", "queuey"); params.put("exchangeType", "topic"); + params.put("connectionTimeout", 123); + params.put("requestedChannelMax", 456); + params.put("requestedFrameMax", 789); + params.put("requestedHeartbeat", 321); RabbitMQEndpoint endpoint = createEndpoint(params); @@ -67,6 +82,10 @@ public class RabbitMQComponentTest { assertEquals(true, endpoint.isAutoDelete()); assertEquals(true, endpoint.isDurable()); assertEquals("topic", endpoint.getExchangeType()); + assertEquals(123, endpoint.getConnectionTimeout()); + assertEquals(456, endpoint.getRequestedChannelMax()); + assertEquals(789, endpoint.getRequestedFrameMax()); + assertEquals(321, endpoint.getRequestedHeartbeat()); } private RabbitMQEndpoint createEndpoint(Map<String, Object> params) throws Exception { @@ -75,4 +94,22 @@ public class RabbitMQComponentTest { return new RabbitMQComponent(context).createEndpoint(uri, remaining, params); } + + @Test + public void testConnectionFactoryRef() throws Exception { + SimpleRegistry registry = new SimpleRegistry(); + ConnectionFactory connectionFactoryMock = Mockito.mock(ConnectionFactory.class); + registry.put("connectionFactoryMock", connectionFactoryMock); + + CamelContext defaultContext = new DefaultCamelContext(registry); + + Map<String, Object> params = new HashMap<String, Object>(); + params.put("connectionFactory", "#connectionFactoryMock"); + + RabbitMQEndpoint endpoint = new RabbitMQComponent(defaultContext).createEndpoint("rabbitmq:localhost/exchange", "localhost/exchange", params); + + assertSame(connectionFactoryMock, endpoint.getConnectionFactory()); + + } + } http://git-wip-us.apache.org/repos/asf/camel/blob/a0b500a4/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java index 21c97f2..11b3675 100644 --- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java +++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java @@ -21,12 +21,16 @@ import java.util.Date; import java.util.HashMap; import java.util.Map; import java.util.UUID; +import java.util.concurrent.Executors; import java.util.concurrent.ThreadPoolExecutor; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Address; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.impl.LongStringHelper; + import org.apache.camel.Exchange; import org.apache.camel.test.junit4.CamelTestSupport; import org.junit.Test; @@ -126,4 +130,56 @@ public class RabbitMQEndpointTest extends CamelTestSupport { assertEquals("Get a wrong endpoint address.", new Address("server1", 12345), endpoint.getAddresses()[0]); assertEquals("Get a wrong endpoint address.", new Address("server2", 12345), endpoint.getAddresses()[1]); } + + private ConnectionFactory createConnectionFactory(String uri) { + RabbitMQEndpoint endpoint = context.getEndpoint(uri, RabbitMQEndpoint.class); + return endpoint.getConnectionFactory(); + } + + @Test + public void testCreateConnectionFactoryDefault() throws Exception { + ConnectionFactory connectionFactory = createConnectionFactory("rabbitmq:localhost:1234/exchange"); + + assertEquals("localhost", connectionFactory.getHost()); + assertEquals(1234, connectionFactory.getPort()); + assertEquals(ConnectionFactory.DEFAULT_VHOST, connectionFactory.getVirtualHost()); + assertEquals(ConnectionFactory.DEFAULT_USER, connectionFactory.getUsername()); + assertEquals(ConnectionFactory.DEFAULT_PASS, connectionFactory.getPassword()); + assertEquals(ConnectionFactory.DEFAULT_CONNECTION_TIMEOUT, connectionFactory.getConnectionTimeout()); + assertEquals(ConnectionFactory.DEFAULT_CHANNEL_MAX, connectionFactory.getRequestedChannelMax()); + assertEquals(ConnectionFactory.DEFAULT_FRAME_MAX, connectionFactory.getRequestedFrameMax()); + assertEquals(ConnectionFactory.DEFAULT_HEARTBEAT, connectionFactory.getRequestedHeartbeat()); + assertFalse(connectionFactory.isSSL()); + assertFalse(connectionFactory.isAutomaticRecoveryEnabled()); + assertEquals(5000, connectionFactory.getNetworkRecoveryInterval()); + assertTrue(connectionFactory.isTopologyRecoveryEnabled()); + } + + @Test + public void testCreateConnectionFactoryCustom() throws Exception { + ConnectionFactory connectionFactory = createConnectionFactory("rabbitmq:localhost:1234/exchange" + + "?username=userxxx" + + "&password=passxxx" + + "&connectionTimeout=123" + + "&requestedChannelMax=456" + + "&requestedFrameMax=789" + + "&requestedHeartbeat=987" + + "&sslProtocol=true" + + "&automaticRecoveryEnabled=true" + + "&networkRecoveryInterval=654" + + "&topologyRecoveryEnabled=false"); + + assertEquals("localhost", connectionFactory.getHost()); + assertEquals(1234, connectionFactory.getPort()); + assertEquals("userxxx", connectionFactory.getUsername()); + assertEquals("passxxx", connectionFactory.getPassword()); + assertEquals(123, connectionFactory.getConnectionTimeout()); + assertEquals(456, connectionFactory.getRequestedChannelMax()); + assertEquals(789, connectionFactory.getRequestedFrameMax()); + assertEquals(987, connectionFactory.getRequestedHeartbeat()); + assertTrue(connectionFactory.isSSL()); + assertTrue(connectionFactory.isAutomaticRecoveryEnabled()); + assertEquals(654, connectionFactory.getNetworkRecoveryInterval()); + assertFalse(connectionFactory.isTopologyRecoveryEnabled()); + } }