Repository: camel Updated Branches: refs/heads/master 490478749 -> fb59878eb
Component docs Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/fb59878e Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/fb59878e Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/fb59878e Branch: refs/heads/master Commit: fb59878ebcc0645d463060b6e70835dafcb1391b Parents: 4904787 Author: Claus Ibsen <davscl...@apache.org> Authored: Tue May 12 10:25:26 2015 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Tue May 12 10:25:26 2015 +0200 ---------------------------------------------------------------------- .../component/rabbitmq/RabbitMQEndpoint.java | 195 ++++++++++++++----- 1 file changed, 146 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/fb59878e/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java index 69c1dc5..7638682 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java @@ -52,7 +52,7 @@ public class RabbitMQEndpoint extends DefaultEndpoint { @UriPath @Metadata(required = "true") private String hostname; - @UriPath @Metadata(required = "true") + @UriPath(defaultValue = "5672") @Metadata(required = "true") private int portNumber; @UriPath @Metadata(required = "true") private String exchangeName; @@ -62,19 +62,19 @@ public class RabbitMQEndpoint extends DefaultEndpoint { private String password = ConnectionFactory.DEFAULT_PASS; @UriParam(defaultValue = ConnectionFactory.DEFAULT_VHOST) private String vhost = ConnectionFactory.DEFAULT_VHOST; - @UriParam(defaultValue = "10") + @UriParam(label = "consumer", defaultValue = "10") private int threadPoolSize = 10; - @UriParam(defaultValue = "true") + @UriParam(label = "consumer", defaultValue = "true") private boolean autoAck = true; @UriParam(defaultValue = "true") private boolean autoDelete = true; @UriParam(defaultValue = "true") private boolean durable = true; - @UriParam + @UriParam(label = "producer") private boolean bridgeEndpoint; @UriParam private String queue = String.valueOf(UUID.randomUUID().toString().hashCode()); - @UriParam(defaultValue = "direct") + @UriParam(defaultValue = "direct", enums = "direct,fanout,headers,topic") private String exchangeType = "direct"; @UriParam private String routingKey; @@ -102,46 +102,33 @@ public class RabbitMQEndpoint extends DefaultEndpoint { private Integer networkRecoveryInterval; @UriParam private Boolean topologyRecoveryEnabled; - //If it is true, prefetchSize, prefetchCount, prefetchGlobal will be used for basicOqs before starting RabbitMQConsumer - @UriParam + @UriParam(label = "consumer") private boolean prefetchEnabled; - //Default in RabbitMq is 0. - @UriParam + @UriParam(label = "consumer") private int prefetchSize; - @UriParam + @UriParam(label = "consumer") private int prefetchCount; - //Default value in RabbitMQ is false. - @UriParam + @UriParam(label = "consumer") private boolean prefetchGlobal; - /** - * Number of concurrent consumer threads - */ - @UriParam(defaultValue = "1") + @UriParam(label = "consumer", defaultValue = "1") private int concurrentConsumers = 1; - //Declares a queue and exchange in RabbitMQ, then binds both. @UriParam(defaultValue = "true") private boolean declare = true; - //Declare dead letter exchange. @UriParam private String deadLetterExchange; - //Declare dead letter routing key. @UriParam private String deadLetterRoutingKey; - //Declare dead letter queue to declare. @UriParam private String deadLetterQueue; - //Dead letter exchange type. - @UriParam(defaultValue = "direct") + @UriParam(defaultValue = "direct", enums = "direct,fanout,headers,topic") private String deadLetterExchangeType = "direct"; - //Maximum number of opened channel in pool - @UriParam(defaultValue = "10") + @UriParam(label = "producer", defaultValue = "10") private int channelPoolMaxSize = 10; - //Maximum time (in milliseconds) waiting for channel - @UriParam(defaultValue = "1000") + @UriParam(label = "producer", defaultValue = "1000") private long channelPoolMaxWait = 1000; - @UriParam + @UriParam(label = "producer") private boolean mandatory; - @UriParam + @UriParam(label = "producer") private boolean immediate; @UriParam private ArgsConfigurer queueArgsConfigurer; @@ -314,6 +301,9 @@ public class RabbitMQEndpoint extends DefaultEndpoint { return username; } + /** + * Username in case of authenticated access + */ public void setUsername(String username) { this.username = username; } @@ -322,6 +312,9 @@ public class RabbitMQEndpoint extends DefaultEndpoint { return password; } + /** + * Password for authenticated access + */ public void setPassword(String password) { this.password = password; } @@ -330,6 +323,9 @@ public class RabbitMQEndpoint extends DefaultEndpoint { return vhost; } + /** + * The vhost for the channel + */ public void setVhost(String vhost) { this.vhost = vhost; } @@ -338,6 +334,9 @@ public class RabbitMQEndpoint extends DefaultEndpoint { return hostname; } + /** + * The hostname of the running rabbitmq instance or cluster. + */ public void setHostname(String hostname) { this.hostname = hostname; } @@ -346,6 +345,9 @@ public class RabbitMQEndpoint extends DefaultEndpoint { return threadPoolSize; } + /** + * The consumer uses a Thread Pool Executor with a fixed number of threads. This setting allows you to set that number of threads. + */ public void setThreadPoolSize(int threadPoolSize) { this.threadPoolSize = threadPoolSize; } @@ -354,6 +356,9 @@ public class RabbitMQEndpoint extends DefaultEndpoint { return portNumber; } + /** + * Port number for the host with the running rabbitmq instance or cluster. Default value is 5672. + */ public void setPortNumber(int portNumber) { this.portNumber = portNumber; } @@ -362,6 +367,9 @@ public class RabbitMQEndpoint extends DefaultEndpoint { return autoAck; } + /** + * If messages should be auto acknowledged + */ public void setAutoAck(boolean autoAck) { this.autoAck = autoAck; } @@ -370,6 +378,9 @@ public class RabbitMQEndpoint extends DefaultEndpoint { return autoDelete; } + /** + * If it is true, the exchange will be deleted when it is no longer in use + */ public void setAutoDelete(boolean autoDelete) { this.autoDelete = autoDelete; } @@ -378,6 +389,9 @@ public class RabbitMQEndpoint extends DefaultEndpoint { return durable; } + /** + * If we are declaring a durable exchange (the exchange will survive a server restart) + */ public void setDurable(boolean durable) { this.durable = durable; } @@ -386,6 +400,9 @@ public class RabbitMQEndpoint extends DefaultEndpoint { return queue; } + /** + * The queue to receive messages from + */ public void setQueue(String queue) { this.queue = queue; } @@ -394,6 +411,10 @@ public class RabbitMQEndpoint extends DefaultEndpoint { return exchangeName; } + /** + * The exchange name determines which exchange produced messages will sent to. + * In the case of consumers, the exchange name determines which exchange the queue will bind to. + */ public void setExchangeName(String exchangeName) { this.exchangeName = exchangeName; } @@ -402,6 +423,9 @@ public class RabbitMQEndpoint extends DefaultEndpoint { return exchangeType; } + /** + * The exchange type such as direct or topic. + */ public void setExchangeType(String exchangeType) { this.exchangeType = exchangeType; } @@ -410,10 +434,17 @@ public class RabbitMQEndpoint extends DefaultEndpoint { return routingKey; } + /** + * The routing key to use when binding a consumer queue to the exchange. + * For producer routing keys, you set the header rabbitmq.ROUTING_KEY. + */ public void setRoutingKey(String routingKey) { this.routingKey = routingKey; } + /** + * If the bridgeEndpoint is true, the producer will ignore the message header of "rabbitmq.EXCHANGE_NAME" and "rabbitmq.ROUTING_KEY" + */ public void setBridgeEndpoint(boolean bridgeEndpoint) { this.bridgeEndpoint = bridgeEndpoint; } @@ -422,6 +453,10 @@ public class RabbitMQEndpoint extends DefaultEndpoint { return bridgeEndpoint; } + /** + * If this option is set, camel-rabbitmq will try to create connection based on the setting of option addresses. + * The addresses value is a string which looks like "server1:12345, server2:12345" + */ public void setAddresses(String addresses) { Address[] addressArray = Address.parseAddresses(addresses); if (addressArray.length > 0) { @@ -437,6 +472,9 @@ public class RabbitMQEndpoint extends DefaultEndpoint { return connectionTimeout; } + /** + * Connection timeout + */ public void setConnectionTimeout(int connectionTimeout) { this.connectionTimeout = connectionTimeout; } @@ -445,6 +483,9 @@ public class RabbitMQEndpoint extends DefaultEndpoint { return requestedChannelMax; } + /** + * Connection requested channel max (max number of channels offered) + */ public void setRequestedChannelMax(int requestedChannelMax) { this.requestedChannelMax = requestedChannelMax; } @@ -453,6 +494,9 @@ public class RabbitMQEndpoint extends DefaultEndpoint { return requestedFrameMax; } + /** + * Connection requested frame max (max size of frame offered) + */ public void setRequestedFrameMax(int requestedFrameMax) { this.requestedFrameMax = requestedFrameMax; } @@ -461,6 +505,9 @@ public class RabbitMQEndpoint extends DefaultEndpoint { return requestedHeartbeat; } + /** + * Connection requested heartbeat (heart-beat in seconds offered) + */ public void setRequestedHeartbeat(int requestedHeartbeat) { this.requestedHeartbeat = requestedHeartbeat; } @@ -469,6 +516,9 @@ public class RabbitMQEndpoint extends DefaultEndpoint { return sslProtocol; } + /** + * Enables SSL on connection, accepted value are `true`, `TLS` and 'SSLv3` + */ public void setSslProtocol(String sslProtocol) { this.sslProtocol = sslProtocol; } @@ -477,6 +527,10 @@ public class RabbitMQEndpoint extends DefaultEndpoint { return connectionFactory; } + /** + * To use a custom RabbitMQ connection factory. + * When this option is set, all connection options (connectionTimeout, requestedChannelMax...) set on URI are not used + */ public void setConnectionFactory(ConnectionFactory connectionFactory) { this.connectionFactory = connectionFactory; } @@ -485,6 +539,9 @@ public class RabbitMQEndpoint extends DefaultEndpoint { return trustManager; } + /** + * Configure SSL trust manager, SSL should be enabled for this option to be effective + */ public void setTrustManager(TrustManager trustManager) { this.trustManager = trustManager; } @@ -493,6 +550,9 @@ public class RabbitMQEndpoint extends DefaultEndpoint { return clientProperties; } + /** + * Connection client properties (client info used in negotiating with the server) + */ public void setClientProperties(Map<String, Object> clientProperties) { this.clientProperties = clientProperties; } @@ -501,6 +561,9 @@ public class RabbitMQEndpoint extends DefaultEndpoint { return automaticRecoveryEnabled; } + /** + * Enables connection automatic recovery (uses connection implementation that performs automatic recovery when connection shutdown is not initiated by the application) + */ public void setAutomaticRecoveryEnabled(Boolean automaticRecoveryEnabled) { this.automaticRecoveryEnabled = automaticRecoveryEnabled; } @@ -509,6 +572,9 @@ public class RabbitMQEndpoint extends DefaultEndpoint { return networkRecoveryInterval; } + /** + * Network recovery interval in milliseconds (interval used when recovering from network failure) + */ public void setNetworkRecoveryInterval(Integer networkRecoveryInterval) { this.networkRecoveryInterval = networkRecoveryInterval; } @@ -517,6 +583,9 @@ public class RabbitMQEndpoint extends DefaultEndpoint { return topologyRecoveryEnabled; } + /** + * Enables connection topology recovery (should topology recovery be performed?) + */ public void setTopologyRecoveryEnabled(Boolean topologyRecoveryEnabled) { this.topologyRecoveryEnabled = topologyRecoveryEnabled; } @@ -525,10 +594,18 @@ public class RabbitMQEndpoint extends DefaultEndpoint { return prefetchEnabled; } + /** + * Enables the quality of service on the RabbitMQConsumer side. + * You need to specify the option of prefetchSize, prefetchCount, prefetchGlobal at the same time + */ public void setPrefetchEnabled(boolean prefetchEnabled) { this.prefetchEnabled = prefetchEnabled; } + /** + * The maximum amount of content (measured in octets) that the server will deliver, 0 if unlimited. + * You need to specify the option of prefetchSize, prefetchCount, prefetchGlobal at the same time + */ public void setPrefetchSize(int prefetchSize) { this.prefetchSize = prefetchSize; } @@ -537,6 +614,10 @@ public class RabbitMQEndpoint extends DefaultEndpoint { return prefetchSize; } + /** + * The maximum number of messages that the server will deliver, 0 if unlimited. + * You need to specify the option of prefetchSize, prefetchCount, prefetchGlobal at the same time + */ public void setPrefetchCount(int prefetchCount) { this.prefetchCount = prefetchCount; } @@ -545,6 +626,10 @@ public class RabbitMQEndpoint extends DefaultEndpoint { return prefetchCount; } + /** + * If the settings should be applied to the entire channel rather than each consumer + * You need to specify the option of prefetchSize, prefetchCount, prefetchGlobal at the same time + */ public void setPrefetchGlobal(boolean prefetchGlobal) { this.prefetchGlobal = prefetchGlobal; } @@ -557,6 +642,9 @@ public class RabbitMQEndpoint extends DefaultEndpoint { return concurrentConsumers; } + /** + * Number of concurrent consumers when consuming from broker. (eg similar as to the same option for the JMS component). + */ public void setConcurrentConsumers(int concurrentConsumers) { this.concurrentConsumers = concurrentConsumers; } @@ -565,6 +653,10 @@ public class RabbitMQEndpoint extends DefaultEndpoint { return declare; } + /** + * If the option is true, camel declare the exchange and queue name and bind them together. + * If the option is false, camel won't declare the exchange and queue name on the server. + */ public void setDeclare(boolean declare) { this.declare = declare; } @@ -573,6 +665,9 @@ public class RabbitMQEndpoint extends DefaultEndpoint { return deadLetterExchange; } + /** + * The name of the dead letter exchange + */ public void setDeadLetterExchange(String deadLetterExchange) { this.deadLetterExchange = deadLetterExchange; } @@ -581,6 +676,9 @@ public class RabbitMQEndpoint extends DefaultEndpoint { return deadLetterQueue; } + /** + * The name of the dead letter queue + */ public void setDeadLetterQueue(String deadLetterQueue) { this.deadLetterQueue = deadLetterQueue; } @@ -589,6 +687,9 @@ public class RabbitMQEndpoint extends DefaultEndpoint { return deadLetterRoutingKey; } + /** + * The routing key for the dead letter exchange + */ public void setDeadLetterRoutingKey(String deadLetterRoutingKey) { this.deadLetterRoutingKey = deadLetterRoutingKey; } @@ -597,14 +698,15 @@ public class RabbitMQEndpoint extends DefaultEndpoint { return deadLetterExchangeType; } + /** + * The type of the dead letter exchange + */ public void setDeadLetterExchangeType(String deadLetterExchangeType) { this.deadLetterExchangeType = deadLetterExchangeType; } /** * Get maximum number of opened channel in pool - * - * @return Maximum number of opened channel in pool */ public int getChannelPoolMaxSize() { return channelPoolMaxSize; @@ -612,26 +714,17 @@ public class RabbitMQEndpoint extends DefaultEndpoint { /** * Set maximum number of opened channel in pool - * - * @param channelPoolMaxSize Maximum number of opened channel in pool */ public void setChannelPoolMaxSize(int channelPoolMaxSize) { this.channelPoolMaxSize = channelPoolMaxSize; } - /** - * Get the maximum number of milliseconds to wait for a channel from the pool - * - * @return Maximum number of milliseconds waiting for a channel - */ public long getChannelPoolMaxWait() { return channelPoolMaxWait; } /** * Set the maximum number of milliseconds to wait for a channel from the pool - * - * @param channelPoolMaxWait Maximum number of milliseconds waiting for a channel */ public void setChannelPoolMaxWait(long channelPoolMaxWait) { this.channelPoolMaxWait = channelPoolMaxWait; @@ -641,6 +734,13 @@ public class RabbitMQEndpoint extends DefaultEndpoint { return mandatory; } + /** + * This flag tells the server how to react if the message cannot be routed to a queue. + * If this flag is set, the server will return an unroutable message with a Return method. + * If this flag is zero, the server silently drops the message. + * <p/> + * If the header is present rabbitmq.MANDATORY it will override this option. + */ public void setMandatory(boolean mandatory) { this.mandatory = mandatory; } @@ -649,37 +749,34 @@ public class RabbitMQEndpoint extends DefaultEndpoint { return immediate; } + /** + * This flag tells the server how to react if the message cannot be routed to a queue consumer immediately. + * If this flag is set, the server will return an undeliverable message with a Return method. + * If this flag is zero, the server will queue the message, but with no guarantee that it will ever be consumed. + * <p/> + * If the header is present rabbitmq.IMMEDIATE it will override this option. + */ public void setImmediate(boolean immediate) { this.immediate = immediate; } - /** - * Get the configurer for setting the queue args in Channel.queueDeclare - * @return - */ public ArgsConfigurer getQueueArgsConfigurer() { return queueArgsConfigurer; } /** * Set the configurer for setting the queue args in Channel.queueDeclare - * @param queueArgsConfigurer the queue args configurer */ public void setQueueArgsConfigurer(ArgsConfigurer queueArgsConfigurer) { this.queueArgsConfigurer = queueArgsConfigurer; } - /** - * Get the configurer for setting the exchange args in Channel.exchangeDeclare - * @return - */ public ArgsConfigurer getExchangeArgsConfigurer() { return exchangeArgsConfigurer; } /** * Set the configurer for setting the exchange args in Channel.exchangeDeclare - * @param queueArgsConfigurer the queue args configurer */ public void setExchangeArgsConfigurer(ArgsConfigurer exchangeArgsConfigurer) { this.exchangeArgsConfigurer = exchangeArgsConfigurer;