This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch camel-3.7.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-3.7.x by this push:
new f77a9c8 CAMEL-16707: rabbitmq - connection leak and an a new option
(#5661) (#5668)
f77a9c8 is described below
commit f77a9c88c1a9d1df91a77b0606934cced9ca9e45
Author: rastislavpapp <[email protected]>
AuthorDate: Mon Jun 14 20:59:16 2021 +0200
CAMEL-16707: rabbitmq - connection leak and an a new option (#5661) (#5668)
* CAMEL-16707: fix rabbitmq connection leak
* CAMEL-16707: add rabbitmq consumer option to disable recovery from
exception during declaration of exchanges/queues
---
.../camel/catalog/docs/rabbitmq-component.adoc | 6 +-
components/camel-rabbitmq/pom.xml | 5 +
.../rabbitmq/RabbitMQComponentConfigurer.java | 6 ++
.../rabbitmq/RabbitMQEndpointConfigurer.java | 6 ++
.../rabbitmq/RabbitMQEndpointUriFactory.java | 3 +-
.../apache/camel/component/rabbitmq/rabbitmq.json | 2 +
.../src/main/docs/rabbitmq-component.adoc | 6 +-
.../camel/component/rabbitmq/RabbitConsumer.java | 18 +++-
.../component/rabbitmq/RabbitMQComponent.java | 18 ++++
.../camel/component/rabbitmq/RabbitMQConsumer.java | 1 -
.../camel/component/rabbitmq/RabbitMQEndpoint.java | 16 ++++
.../RabbitMQRecoverFromDeclareExceptionIT.java | 102 +++++++++++++++++++++
.../dsl/RabbitmqComponentBuilderFactory.java | 22 +++++
.../dsl/RabbitMQEndpointBuilderFactory.java | 43 +++++++++
.../modules/ROOT/pages/rabbitmq-component.adoc | 6 +-
15 files changed, 251 insertions(+), 9 deletions(-)
diff --git
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/rabbitmq-component.adoc
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/rabbitmq-component.adoc
index f43d384..915471a 100644
---
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/rabbitmq-component.adoc
+++
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/rabbitmq-component.adoc
@@ -57,7 +57,7 @@ determines the exchange the queue will be bound to.
== Options
// component options: START
-The RabbitMQ component supports 55 options, which are listed below.
+The RabbitMQ component supports 56 options, which are listed below.
@@ -88,6 +88,7 @@ The RabbitMQ component supports 55 options, which are listed
below.
| *prefetchEnabled* (consumer) | Enables the quality of service on the
RabbitMQConsumer side. You need to specify the option of prefetchSize,
prefetchCount, prefetchGlobal at the same time | false | boolean
| *prefetchGlobal* (consumer) | If the settings should be applied to the
entire channel rather than each consumer You need to specify the option of
prefetchSize, prefetchCount, prefetchGlobal at the same time | false | boolean
| *prefetchSize* (consumer) | The maximum amount of content (measured in
octets) that the server will deliver, 0 if unlimited. You need to specify the
option of prefetchSize, prefetchCount, prefetchGlobal at the same time | | int
+| *recoverFromDeclareException* (consumer) | Decides whether an exception
during declaration of exchanges or queues is recoverable or not. If the option
is false, camel will throw an exception when starting the consumer, which will
interrupt application startup (e.g. in the case when the exchange / queue is
already declared in RabbitMQ and has incompatible configuration). If set to
true, the consumer will try to reconnect periodically. | false | boolean
| *threadPoolSize* (consumer) | The consumer uses a Thread Pool Executor with
a fixed number of threads. This setting allows you to set that number of
threads. | 10 | int
| *additionalHeaders* (producer) | Map of additional headers. These headers
will be set only when the 'allowCustomHeaders' is set to true | | Map
| *additionalProperties* (producer) | Map of additional properties. These are
standard RabbitMQ properties as defined in
com.rabbitmq.client.AMQP.BasicProperties The map keys should be from
org.apache.camel.component.rabbitmq.RabbitMQConstants. Any other keys will be
ignored. When the message already contains these headers they will be given
precedence over these properties. | | Map
@@ -142,7 +143,7 @@ with the following path and query parameters:
|===
-=== Query Parameters (66 parameters):
+=== Query Parameters (67 parameters):
[width="100%",cols="2,5,^1,2",options="header"]
@@ -179,6 +180,7 @@ with the following path and query parameters:
| *prefetchEnabled* (consumer) | Enables the quality of service on the
RabbitMQConsumer side. You need to specify the option of prefetchSize,
prefetchCount, prefetchGlobal at the same time | false | boolean
| *prefetchGlobal* (consumer) | If the settings should be applied to the
entire channel rather than each consumer You need to specify the option of
prefetchSize, prefetchCount, prefetchGlobal at the same time | false | boolean
| *prefetchSize* (consumer) | The maximum amount of content (measured in
octets) that the server will deliver, 0 if unlimited. You need to specify the
option of prefetchSize, prefetchCount, prefetchGlobal at the same time | | int
+| *recoverFromDeclareException* (consumer) | Decides whether an exception
during declaration of exchanges or queues is recoverable or not. If the option
is false, camel will throw an exception when starting the consumer, which will
interrupt application startup (e.g. in the case when the exchange / queue is
already declared in RabbitMQ and has incompatible configuration). If set to
true, the consumer will try to reconnect periodically. | true | boolean
| *reQueue* (consumer) | This is used by the consumer to control rejection of
the message. When the consumer is complete processing the exchange, and if the
exchange failed, then the consumer is going to reject the message from the
RabbitMQ broker. If the header CamelRabbitmqRequeue is present then the value
of the header will be used, otherwise this endpoint value is used as fallback.
If the value is false (by default) then the message is discarded/dead-lettered.
If the value is true, t [...]
| *exceptionHandler* (consumer) | To let the consumer use a custom
ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this
option is not in use. By default the consumer will deal with exceptions, that
will be logged at WARN or ERROR level and ignored. | | ExceptionHandler
| *exchangePattern* (consumer) | Sets the exchange pattern when the consumer
creates an exchange. There are 3 enums and the value can be one of: InOnly,
InOut, InOptionalOut | | ExchangePattern
diff --git a/components/camel-rabbitmq/pom.xml
b/components/camel-rabbitmq/pom.xml
index 712fb04..3996838 100644
--- a/components/camel-rabbitmq/pom.xml
+++ b/components/camel-rabbitmq/pom.xml
@@ -117,6 +117,11 @@
<artifactId>camel-http</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.assertj</groupId>
+ <artifactId>assertj-core</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<profiles>
diff --git
a/components/camel-rabbitmq/src/generated/java/org/apache/camel/component/rabbitmq/RabbitMQComponentConfigurer.java
b/components/camel-rabbitmq/src/generated/java/org/apache/camel/component/rabbitmq/RabbitMQComponentConfigurer.java
index d381c07..61cb92b 100644
---
a/components/camel-rabbitmq/src/generated/java/org/apache/camel/component/rabbitmq/RabbitMQComponentConfigurer.java
+++
b/components/camel-rabbitmq/src/generated/java/org/apache/camel/component/rabbitmq/RabbitMQComponentConfigurer.java
@@ -91,6 +91,8 @@ public class RabbitMQComponentConfigurer extends
PropertyConfigurerSupport imple
case "publisherAcknowledgements":
target.setPublisherAcknowledgements(property(camelContext, boolean.class,
value)); return true;
case "publisheracknowledgementstimeout":
case "publisherAcknowledgementsTimeout":
target.setPublisherAcknowledgementsTimeout(property(camelContext, long.class,
value)); return true;
+ case "recoverfromdeclareexception":
+ case "recoverFromDeclareException":
target.setRecoverFromDeclareException(property(camelContext, boolean.class,
value)); return true;
case "requesttimeout":
case "requestTimeout": target.setRequestTimeout(property(camelContext,
long.class, value)); return true;
case "requesttimeoutcheckerinterval":
@@ -196,6 +198,8 @@ public class RabbitMQComponentConfigurer extends
PropertyConfigurerSupport imple
case "publisherAcknowledgements": return boolean.class;
case "publisheracknowledgementstimeout":
case "publisherAcknowledgementsTimeout": return long.class;
+ case "recoverfromdeclareexception":
+ case "recoverFromDeclareException": return boolean.class;
case "requesttimeout":
case "requestTimeout": return long.class;
case "requesttimeoutcheckerinterval":
@@ -302,6 +306,8 @@ public class RabbitMQComponentConfigurer extends
PropertyConfigurerSupport imple
case "publisherAcknowledgements": return
target.isPublisherAcknowledgements();
case "publisheracknowledgementstimeout":
case "publisherAcknowledgementsTimeout": return
target.getPublisherAcknowledgementsTimeout();
+ case "recoverfromdeclareexception":
+ case "recoverFromDeclareException": return
target.isRecoverFromDeclareException();
case "requesttimeout":
case "requestTimeout": return target.getRequestTimeout();
case "requesttimeoutcheckerinterval":
diff --git
a/components/camel-rabbitmq/src/generated/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointConfigurer.java
b/components/camel-rabbitmq/src/generated/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointConfigurer.java
index 17dd65c..faca916 100644
---
a/components/camel-rabbitmq/src/generated/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointConfigurer.java
+++
b/components/camel-rabbitmq/src/generated/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointConfigurer.java
@@ -106,6 +106,8 @@ public class RabbitMQEndpointConfigurer extends
PropertyConfigurerSupport implem
case "queue": target.setQueue(property(camelContext,
java.lang.String.class, value)); return true;
case "requeue":
case "reQueue": target.setReQueue(property(camelContext,
boolean.class, value)); return true;
+ case "recoverfromdeclareexception":
+ case "recoverFromDeclareException":
target.setRecoverFromDeclareException(property(camelContext, boolean.class,
value)); return true;
case "requesttimeout":
case "requestTimeout": target.setRequestTimeout(property(camelContext,
long.class, value)); return true;
case "requesttimeoutcheckerinterval":
@@ -231,6 +233,8 @@ public class RabbitMQEndpointConfigurer extends
PropertyConfigurerSupport implem
case "queue": return java.lang.String.class;
case "requeue":
case "reQueue": return boolean.class;
+ case "recoverfromdeclareexception":
+ case "recoverFromDeclareException": return boolean.class;
case "requesttimeout":
case "requestTimeout": return long.class;
case "requesttimeoutcheckerinterval":
@@ -357,6 +361,8 @@ public class RabbitMQEndpointConfigurer extends
PropertyConfigurerSupport implem
case "queue": return target.getQueue();
case "requeue":
case "reQueue": return target.isReQueue();
+ case "recoverfromdeclareexception":
+ case "recoverFromDeclareException": return
target.isRecoverFromDeclareException();
case "requesttimeout":
case "requestTimeout": return target.getRequestTimeout();
case "requesttimeoutcheckerinterval":
diff --git
a/components/camel-rabbitmq/src/generated/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointUriFactory.java
b/components/camel-rabbitmq/src/generated/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointUriFactory.java
index 77cbede..d352ca1 100644
---
a/components/camel-rabbitmq/src/generated/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointUriFactory.java
+++
b/components/camel-rabbitmq/src/generated/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointUriFactory.java
@@ -20,7 +20,7 @@ public class RabbitMQEndpointUriFactory extends
org.apache.camel.support.compone
private static final Set<String> PROPERTY_NAMES;
private static final Set<String> SECRET_PROPERTY_NAMES;
static {
- Set<String> props = new HashSet<>(67);
+ Set<String> props = new HashSet<>(68);
props.add("prefetchCount");
props.add("publisherAcknowledgementsTimeout");
props.add("addresses");
@@ -50,6 +50,7 @@ public class RabbitMQEndpointUriFactory extends
org.apache.camel.support.compone
props.add("concurrentConsumers");
props.add("guaranteedDeliveries");
props.add("vhost");
+ props.add("recoverFromDeclareException");
props.add("lazyStartProducer");
props.add("threadPoolSize");
props.add("deadLetterQueue");
diff --git
a/components/camel-rabbitmq/src/generated/resources/org/apache/camel/component/rabbitmq/rabbitmq.json
b/components/camel-rabbitmq/src/generated/resources/org/apache/camel/component/rabbitmq/rabbitmq.json
index 784d99b..b071691 100644
---
a/components/camel-rabbitmq/src/generated/resources/org/apache/camel/component/rabbitmq/rabbitmq.json
+++
b/components/camel-rabbitmq/src/generated/resources/org/apache/camel/component/rabbitmq/rabbitmq.json
@@ -46,6 +46,7 @@
"prefetchEnabled": { "kind": "property", "displayName": "Prefetch
Enabled", "group": "consumer", "label": "consumer", "required": false, "type":
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": false, "description": "Enables the quality of
service on the RabbitMQConsumer side. You need to specify the option of
prefetchSize, prefetchCount, prefetchGlobal at the same time" },
"prefetchGlobal": { "kind": "property", "displayName": "Prefetch Global",
"group": "consumer", "label": "consumer", "required": false, "type": "boolean",
"javaType": "boolean", "deprecated": false, "autowired": false, "secret":
false, "defaultValue": false, "description": "If the settings should be applied
to the entire channel rather than each consumer You need to specify the option
of prefetchSize, prefetchCount, prefetchGlobal at the same time" },
"prefetchSize": { "kind": "property", "displayName": "Prefetch Size",
"group": "consumer", "label": "consumer", "required": false, "type": "integer",
"javaType": "int", "deprecated": false, "autowired": false, "secret": false,
"description": "The maximum amount of content (measured in octets) that the
server will deliver, 0 if unlimited. You need to specify the option of
prefetchSize, prefetchCount, prefetchGlobal at the same time" },
+ "recoverFromDeclareException": { "kind": "property", "displayName":
"Recover From Declare Exception", "group": "consumer", "label": "consumer",
"required": false, "type": "boolean", "javaType": "boolean", "deprecated":
false, "autowired": false, "secret": false, "defaultValue": false,
"description": "Decides whether an exception during declaration of exchanges or
queues is recoverable or not. If the option is false, camel will throw an
exception when starting the consumer, which will [...]
"threadPoolSize": { "kind": "property", "displayName": "Thread Pool Size",
"group": "consumer (advanced)", "label": "consumer,advanced", "required":
false, "type": "integer", "javaType": "int", "deprecated": false, "autowired":
false, "secret": false, "defaultValue": 10, "description": "The consumer uses a
Thread Pool Executor with a fixed number of threads. This setting allows you to
set that number of threads." },
"additionalHeaders": { "kind": "property", "displayName": "Additional
Headers", "group": "producer", "label": "producer", "required": false, "type":
"object", "javaType": "java.util.Map<java.lang.String, java.lang.Object>",
"deprecated": false, "autowired": false, "secret": false, "description": "Map
of additional headers. These headers will be set only when the
'allowCustomHeaders' is set to true" },
"additionalProperties": { "kind": "property", "displayName": "Additional
Properties", "group": "producer", "label": "producer", "required": false,
"type": "object", "javaType": "java.util.Map<java.lang.String,
java.lang.Object>", "deprecated": false, "autowired": false, "secret": false,
"description": "Map of additional properties. These are standard RabbitMQ
properties as defined in com.rabbitmq.client.AMQP.BasicProperties The map keys
should be from org.apache.camel.component.rabbi [...]
@@ -111,6 +112,7 @@
"prefetchEnabled": { "kind": "parameter", "displayName": "Prefetch
Enabled", "group": "consumer", "label": "consumer", "required": false, "type":
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": false, "description": "Enables the quality of
service on the RabbitMQConsumer side. You need to specify the option of
prefetchSize, prefetchCount, prefetchGlobal at the same time" },
"prefetchGlobal": { "kind": "parameter", "displayName": "Prefetch Global",
"group": "consumer", "label": "consumer", "required": false, "type": "boolean",
"javaType": "boolean", "deprecated": false, "autowired": false, "secret":
false, "defaultValue": false, "description": "If the settings should be applied
to the entire channel rather than each consumer You need to specify the option
of prefetchSize, prefetchCount, prefetchGlobal at the same time" },
"prefetchSize": { "kind": "parameter", "displayName": "Prefetch Size",
"group": "consumer", "label": "consumer", "required": false, "type": "integer",
"javaType": "int", "deprecated": false, "autowired": false, "secret": false,
"description": "The maximum amount of content (measured in octets) that the
server will deliver, 0 if unlimited. You need to specify the option of
prefetchSize, prefetchCount, prefetchGlobal at the same time" },
+ "recoverFromDeclareException": { "kind": "parameter", "displayName":
"Recover From Declare Exception", "group": "consumer", "label": "consumer",
"required": false, "type": "boolean", "javaType": "boolean", "deprecated":
false, "autowired": false, "secret": false, "defaultValue": true,
"description": "Decides whether an exception during declaration of exchanges or
queues is recoverable or not. If the option is false, camel will throw an
exception when starting the consumer, which will [...]
"reQueue": { "kind": "parameter", "displayName": "Re Queue", "group":
"consumer", "label": "consumer", "required": false, "type": "boolean",
"javaType": "boolean", "deprecated": false, "autowired": false, "secret":
false, "defaultValue": false, "description": "This is used by the consumer to
control rejection of the message. When the consumer is complete processing the
exchange, and if the exchange failed, then the consumer is going to reject the
message from the RabbitMQ broker. If [...]
"exceptionHandler": { "kind": "parameter", "displayName": "Exception
Handler", "group": "consumer (advanced)", "label": "consumer,advanced",
"required": false, "type": "object", "javaType":
"org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.",
"deprecated": false, "autowired": false, "secret": false, "description": "To
let the consumer use a custom ExceptionHandler. Notice if the option
bridgeErrorHandler is enabled then this option is not in use. By default the
con [...]
"exchangePattern": { "kind": "parameter", "displayName": "Exchange
Pattern", "group": "consumer (advanced)", "label": "consumer,advanced",
"required": false, "type": "object", "javaType":
"org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut",
"InOptionalOut" ], "deprecated": false, "autowired": false, "secret": false,
"description": "Sets the exchange pattern when the consumer creates an
exchange." },
diff --git a/components/camel-rabbitmq/src/main/docs/rabbitmq-component.adoc
b/components/camel-rabbitmq/src/main/docs/rabbitmq-component.adoc
index f43d384..915471a 100644
--- a/components/camel-rabbitmq/src/main/docs/rabbitmq-component.adoc
+++ b/components/camel-rabbitmq/src/main/docs/rabbitmq-component.adoc
@@ -57,7 +57,7 @@ determines the exchange the queue will be bound to.
== Options
// component options: START
-The RabbitMQ component supports 55 options, which are listed below.
+The RabbitMQ component supports 56 options, which are listed below.
@@ -88,6 +88,7 @@ The RabbitMQ component supports 55 options, which are listed
below.
| *prefetchEnabled* (consumer) | Enables the quality of service on the
RabbitMQConsumer side. You need to specify the option of prefetchSize,
prefetchCount, prefetchGlobal at the same time | false | boolean
| *prefetchGlobal* (consumer) | If the settings should be applied to the
entire channel rather than each consumer You need to specify the option of
prefetchSize, prefetchCount, prefetchGlobal at the same time | false | boolean
| *prefetchSize* (consumer) | The maximum amount of content (measured in
octets) that the server will deliver, 0 if unlimited. You need to specify the
option of prefetchSize, prefetchCount, prefetchGlobal at the same time | | int
+| *recoverFromDeclareException* (consumer) | Decides whether an exception
during declaration of exchanges or queues is recoverable or not. If the option
is false, camel will throw an exception when starting the consumer, which will
interrupt application startup (e.g. in the case when the exchange / queue is
already declared in RabbitMQ and has incompatible configuration). If set to
true, the consumer will try to reconnect periodically. | false | boolean
| *threadPoolSize* (consumer) | The consumer uses a Thread Pool Executor with
a fixed number of threads. This setting allows you to set that number of
threads. | 10 | int
| *additionalHeaders* (producer) | Map of additional headers. These headers
will be set only when the 'allowCustomHeaders' is set to true | | Map
| *additionalProperties* (producer) | Map of additional properties. These are
standard RabbitMQ properties as defined in
com.rabbitmq.client.AMQP.BasicProperties The map keys should be from
org.apache.camel.component.rabbitmq.RabbitMQConstants. Any other keys will be
ignored. When the message already contains these headers they will be given
precedence over these properties. | | Map
@@ -142,7 +143,7 @@ with the following path and query parameters:
|===
-=== Query Parameters (66 parameters):
+=== Query Parameters (67 parameters):
[width="100%",cols="2,5,^1,2",options="header"]
@@ -179,6 +180,7 @@ with the following path and query parameters:
| *prefetchEnabled* (consumer) | Enables the quality of service on the
RabbitMQConsumer side. You need to specify the option of prefetchSize,
prefetchCount, prefetchGlobal at the same time | false | boolean
| *prefetchGlobal* (consumer) | If the settings should be applied to the
entire channel rather than each consumer You need to specify the option of
prefetchSize, prefetchCount, prefetchGlobal at the same time | false | boolean
| *prefetchSize* (consumer) | The maximum amount of content (measured in
octets) that the server will deliver, 0 if unlimited. You need to specify the
option of prefetchSize, prefetchCount, prefetchGlobal at the same time | | int
+| *recoverFromDeclareException* (consumer) | Decides whether an exception
during declaration of exchanges or queues is recoverable or not. If the option
is false, camel will throw an exception when starting the consumer, which will
interrupt application startup (e.g. in the case when the exchange / queue is
already declared in RabbitMQ and has incompatible configuration). If set to
true, the consumer will try to reconnect periodically. | true | boolean
| *reQueue* (consumer) | This is used by the consumer to control rejection of
the message. When the consumer is complete processing the exchange, and if the
exchange failed, then the consumer is going to reject the message from the
RabbitMQ broker. If the header CamelRabbitmqRequeue is present then the value
of the header will be used, otherwise this endpoint value is used as fallback.
If the value is false (by default) then the message is discarded/dead-lettered.
If the value is true, t [...]
| *exceptionHandler* (consumer) | To let the consumer use a custom
ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this
option is not in use. By default the consumer will deal with exceptions, that
will be logged at WARN or ERROR level and ignored. | | ExceptionHandler
| *exchangePattern* (consumer) | Sets the exchange pattern when the consumer
creates an exchange. There are 3 enums and the value can be one of: InOnly,
InOut, InOptionalOut | | ExchangePattern
diff --git
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
index f1bd416..b852b74 100644
---
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
+++
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
@@ -380,7 +380,23 @@ class RabbitConsumer extends ServiceSupport implements
com.rabbitmq.client.Consu
// This really only needs to be called on the first consumer or on
// reconnections.
if (consumer.getEndpoint().isDeclare()) {
- consumer.getEndpoint().declareExchangeAndQueue(channel);
+ try {
+ consumer.getEndpoint().declareExchangeAndQueue(channel);
+ } catch (IOException e) {
+ if (channel != null && channel.isOpen()) {
+ try {
+ channel.close();
+ } catch (Exception innerEx) {
+ e.addSuppressed(innerEx);
+ }
+ }
+ if
(this.consumer.getEndpoint().isRecoverFromDeclareException()) {
+ throw e;
+ } else {
+ throw new RuntimeCamelException(
+ "Unrecoverable error when attempting to declare
exchange or queue for " + consumer, e);
+ }
+ }
}
return channel;
}
diff --git
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java
index 4f6f97f..f2c1873 100644
---
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java
+++
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java
@@ -148,6 +148,8 @@ public class RabbitMQComponent extends DefaultComponent {
private Map<String, Object> clientProperties;
@Metadata(label = "advanced")
private ExceptionHandler connectionFactoryExceptionHandler;
+ @Metadata(label = "consumer")
+ private boolean recoverFromDeclareException = true;
public RabbitMQComponent() {
}
@@ -270,6 +272,7 @@ public class RabbitMQComponent extends DefaultComponent {
endpoint.setDeadLetterRoutingKey(getDeadLetterRoutingKey());
endpoint.setAllowNullHeaders(isAllowNullHeaders());
endpoint.setConnectionFactoryExceptionHandler(getConnectionFactoryExceptionHandler());
+
endpoint.setRecoverFromDeclareException(isRecoverFromDeclareException());
if (LOG.isDebugEnabled()) {
LOG.debug("Creating RabbitMQEndpoint with host {}:{} and
exchangeName: {}",
@@ -910,4 +913,19 @@ public class RabbitMQComponent extends DefaultComponent {
public void setConnectionFactoryExceptionHandler(ExceptionHandler
connectionFactoryExceptionHandler) {
this.connectionFactoryExceptionHandler =
connectionFactoryExceptionHandler;
}
+
+ public boolean isRecoverFromDeclareException() {
+ return recoverFromDeclareException;
+ }
+
+ /**
+ * Decides whether an exception during declaration of exchanges or queues
is recoverable or not. If the option is
+ * false, camel will throw an exception when starting the consumer, which
will interrupt application startup (e.g.
+ * in the case when the exchange / queue is already declared in RabbitMQ
and has incompatible configuration). If set
+ * to true, the consumer will try to reconnect periodically.
+ */
+ public void setRecoverFromDeclareException(boolean
recoverFromDeclareException) {
+ this.recoverFromDeclareException = recoverFromDeclareException;
+ }
+
}
diff --git
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
index 63b044d..d31a868 100644
---
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
+++
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
@@ -79,7 +79,6 @@ public class RabbitMQConsumer extends DefaultConsumer
implements Suspendable {
openConnection();
return this.conn;
} else {
- openConnection();
return this.conn;
}
}
diff --git
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
index 7f865de..e976775 100644
---
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
+++
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
@@ -186,6 +186,8 @@ public class RabbitMQEndpoint extends DefaultEndpoint
implements AsyncEndpoint {
private boolean allowMessageBodySerialization;
@UriParam(label = "consumer")
private boolean reQueue;
+ @UriParam(label = "consumer", defaultValue = "true")
+ private boolean recoverFromDeclareException = true;
// camel-jms supports this setting but it is not currently configurable in
// camel-rabbitmq
private boolean useMessageIDAsCorrelationID = true;
@@ -705,6 +707,20 @@ public class RabbitMQEndpoint extends DefaultEndpoint
implements AsyncEndpoint {
}
/**
+ * Decides whether an exception during declaration of exchanges or queues
is recoverable or not. If the option is
+ * false, camel will throw an exception when starting the consumer, which
will interrupt application startup (e.g.
+ * in the case when the exchange / queue is already declared in RabbitMQ
and has incompatible configuration). If set
+ * to true, the consumer will try to reconnect periodically.
+ */
+ public boolean isRecoverFromDeclareException() {
+ return recoverFromDeclareException;
+ }
+
+ public void setRecoverFromDeclareException(boolean
recoverFromDeclareException) {
+ this.recoverFromDeclareException = recoverFromDeclareException;
+ }
+
+ /**
* If the option is true, camel declare the exchange and queue name and
bind them together. If the option is false,
* camel won't declare the exchange and queue name on the server.
*/
diff --git
a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/RabbitMQRecoverFromDeclareExceptionIT.java
b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/RabbitMQRecoverFromDeclareExceptionIT.java
new file mode 100644
index 0000000..ad4ef8d
--- /dev/null
+++
b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/RabbitMQRecoverFromDeclareExceptionIT.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.rabbitmq.integration;
+
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.rabbitmq.RabbitMQComponent;
+import org.apache.camel.test.infra.rabbitmq.services.ConnectionProperties;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+import static org.assertj.core.api.Assertions.assertThatNoException;
+import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_METHOD;
+
+/**
+ * Integration test to check whether an exception during declaring
exchanges/queues during startup of camel context
+ * behaves according to the value of {@link
RabbitMQComponent#isRecoverFromDeclareException()}.
+ */
+@TestInstance(PER_METHOD)
+public class RabbitMQRecoverFromDeclareExceptionIT extends
AbstractRabbitMQIntTest {
+
+ private static final String EXCHANGE = "testExchange";
+
+ private com.rabbitmq.client.Connection connection;
+ private com.rabbitmq.client.Channel channel;
+
+ @Override
+ public boolean isUseAdviceWith() {
+ return true;
+ }
+
+ @Override
+ @BeforeEach
+ public void setUp() throws Exception {
+ connection = connection();
+ channel = connection.createChannel();
+ channel.exchangeDeclare(EXCHANGE, "direct");
+
+ super.setUp();
+
+ }
+
+ @Override
+ @AfterEach
+ public void tearDown() throws Exception {
+ super.tearDown();
+
+ channel.abort();
+ connection.abort();
+ }
+
+ @Test
+ void testWrongExchangeTypeInterruptsStartupWhenRecoveryOff() throws
Exception {
+ context.addRoutes(createRouteWithWrongExchangeType(false));
+ assertThatExceptionOfType(RuntimeCamelException.class)
+ .isThrownBy(() -> context.start())
+ .withMessageStartingWith("Unrecoverable error when attempting
to declare exchange or queue");
+ }
+
+ @Test
+ void testWrongExchangeTypeDoesNotInterruptStartup() throws Exception {
+ context.addRoutes(createRouteWithWrongExchangeType(true));
+ assertThatNoException().isThrownBy(() -> context.start());
+ }
+
+ private RoutesBuilder createRouteWithWrongExchangeType(boolean
recoverFromDeclareException) {
+ ConnectionProperties connectionProperties =
service.connectionProperties();
+ String rabbitMQEndpoint =
String.format("rabbitmq:localhost:%d/%s?exchangeType=fanout" +
+ "&username=%s&password=%s" +
+ "&queue=q1" +
+ "&routingKey=rk1" +
+
"&recoverFromDeclareException=%b",
+ connectionProperties.port(), EXCHANGE,
connectionProperties.username(), connectionProperties.password(),
+ recoverFromDeclareException);
+
+ return new RouteBuilder(context) {
+ @Override
+ public void configure() {
+ from(rabbitMQEndpoint).id("consumingRoute").log("Receiving
message");
+ }
+ };
+ }
+
+}
diff --git
a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/RabbitmqComponentBuilderFactory.java
b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/RabbitmqComponentBuilderFactory.java
index 717fc1e..98152ab 100644
---
a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/RabbitmqComponentBuilderFactory.java
+++
b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/RabbitmqComponentBuilderFactory.java
@@ -443,6 +443,27 @@ public interface RabbitmqComponentBuilderFactory {
return this;
}
/**
+ * Decides whether an exception during declaration of exchanges or
+ * queues is recoverable or not. If the option is false, camel will
+ * throw an exception when starting the consumer, which will interrupt
+ * application startup (e.g. in the case when the exchange / queue is
+ * already declared in RabbitMQ and has incompatible configuration). If
+ * set to true, the consumer will try to reconnect periodically.
+ *
+ * The option is a: <code>boolean</code> type.
+ *
+ * Default: false
+ * Group: consumer
+ *
+ * @param recoverFromDeclareException the value to set
+ * @return the dsl builder
+ */
+ default RabbitmqComponentBuilder recoverFromDeclareException(
+ boolean recoverFromDeclareException) {
+ doSetProperty("recoverFromDeclareException",
recoverFromDeclareException);
+ return this;
+ }
+ /**
* The consumer uses a Thread Pool Executor with a fixed number of
* threads. This setting allows you to set that number of threads.
*
@@ -1018,6 +1039,7 @@ public interface RabbitmqComponentBuilderFactory {
case "prefetchEnabled": ((RabbitMQComponent)
component).setPrefetchEnabled((boolean) value); return true;
case "prefetchGlobal": ((RabbitMQComponent)
component).setPrefetchGlobal((boolean) value); return true;
case "prefetchSize": ((RabbitMQComponent)
component).setPrefetchSize((int) value); return true;
+ case "recoverFromDeclareException": ((RabbitMQComponent)
component).setRecoverFromDeclareException((boolean) value); return true;
case "threadPoolSize": ((RabbitMQComponent)
component).setThreadPoolSize((int) value); return true;
case "additionalHeaders": ((RabbitMQComponent)
component).setAdditionalHeaders((java.util.Map) value); return true;
case "additionalProperties": ((RabbitMQComponent)
component).setAdditionalProperties((java.util.Map) value); return true;
diff --git
a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/RabbitMQEndpointBuilderFactory.java
b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/RabbitMQEndpointBuilderFactory.java
index 6f9f0e0..8faa5ce 100644
---
a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/RabbitMQEndpointBuilderFactory.java
+++
b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/RabbitMQEndpointBuilderFactory.java
@@ -910,6 +910,49 @@ public interface RabbitMQEndpointBuilderFactory {
return this;
}
/**
+ * Decides whether an exception during declaration of exchanges or
+ * queues is recoverable or not. If the option is false, camel will
+ * throw an exception when starting the consumer, which will interrupt
+ * application startup (e.g. in the case when the exchange / queue is
+ * already declared in RabbitMQ and has incompatible configuration). If
+ * set to true, the consumer will try to reconnect periodically.
+ *
+ * The option is a: <code>boolean</code> type.
+ *
+ * Default: true
+ * Group: consumer
+ *
+ * @param recoverFromDeclareException the value to set
+ * @return the dsl builder
+ */
+ default RabbitMQEndpointConsumerBuilder recoverFromDeclareException(
+ boolean recoverFromDeclareException) {
+ doSetProperty("recoverFromDeclareException",
recoverFromDeclareException);
+ return this;
+ }
+ /**
+ * Decides whether an exception during declaration of exchanges or
+ * queues is recoverable or not. If the option is false, camel will
+ * throw an exception when starting the consumer, which will interrupt
+ * application startup (e.g. in the case when the exchange / queue is
+ * already declared in RabbitMQ and has incompatible configuration). If
+ * set to true, the consumer will try to reconnect periodically.
+ *
+ * The option will be converted to a <code>boolean</code>
+ * type.
+ *
+ * Default: true
+ * Group: consumer
+ *
+ * @param recoverFromDeclareException the value to set
+ * @return the dsl builder
+ */
+ default RabbitMQEndpointConsumerBuilder recoverFromDeclareException(
+ String recoverFromDeclareException) {
+ doSetProperty("recoverFromDeclareException",
recoverFromDeclareException);
+ return this;
+ }
+ /**
* This is used by the consumer to control rejection of the message.
* When the consumer is complete processing the exchange, and if the
* exchange failed, then the consumer is going to reject the message
diff --git a/docs/components/modules/ROOT/pages/rabbitmq-component.adoc
b/docs/components/modules/ROOT/pages/rabbitmq-component.adoc
index 7461f53..cd2c55b 100644
--- a/docs/components/modules/ROOT/pages/rabbitmq-component.adoc
+++ b/docs/components/modules/ROOT/pages/rabbitmq-component.adoc
@@ -59,7 +59,7 @@ determines the exchange the queue will be bound to.
== Options
// component options: START
-The RabbitMQ component supports 55 options, which are listed below.
+The RabbitMQ component supports 56 options, which are listed below.
@@ -90,6 +90,7 @@ The RabbitMQ component supports 55 options, which are listed
below.
| *prefetchEnabled* (consumer) | Enables the quality of service on the
RabbitMQConsumer side. You need to specify the option of prefetchSize,
prefetchCount, prefetchGlobal at the same time | false | boolean
| *prefetchGlobal* (consumer) | If the settings should be applied to the
entire channel rather than each consumer You need to specify the option of
prefetchSize, prefetchCount, prefetchGlobal at the same time | false | boolean
| *prefetchSize* (consumer) | The maximum amount of content (measured in
octets) that the server will deliver, 0 if unlimited. You need to specify the
option of prefetchSize, prefetchCount, prefetchGlobal at the same time | | int
+| *recoverFromDeclareException* (consumer) | Decides whether an exception
during declaration of exchanges or queues is recoverable or not. If the option
is false, camel will throw an exception when starting the consumer, which will
interrupt application startup (e.g. in the case when the exchange / queue is
already declared in RabbitMQ and has incompatible configuration). If set to
true, the consumer will try to reconnect periodically. | false | boolean
| *threadPoolSize* (consumer) | The consumer uses a Thread Pool Executor with
a fixed number of threads. This setting allows you to set that number of
threads. | 10 | int
| *additionalHeaders* (producer) | Map of additional headers. These headers
will be set only when the 'allowCustomHeaders' is set to true | | Map
| *additionalProperties* (producer) | Map of additional properties. These are
standard RabbitMQ properties as defined in
com.rabbitmq.client.AMQP.BasicProperties The map keys should be from
org.apache.camel.component.rabbitmq.RabbitMQConstants. Any other keys will be
ignored. When the message already contains these headers they will be given
precedence over these properties. | | Map
@@ -144,7 +145,7 @@ with the following path and query parameters:
|===
-=== Query Parameters (66 parameters):
+=== Query Parameters (67 parameters):
[width="100%",cols="2,5,^1,2",options="header"]
@@ -181,6 +182,7 @@ with the following path and query parameters:
| *prefetchEnabled* (consumer) | Enables the quality of service on the
RabbitMQConsumer side. You need to specify the option of prefetchSize,
prefetchCount, prefetchGlobal at the same time | false | boolean
| *prefetchGlobal* (consumer) | If the settings should be applied to the
entire channel rather than each consumer You need to specify the option of
prefetchSize, prefetchCount, prefetchGlobal at the same time | false | boolean
| *prefetchSize* (consumer) | The maximum amount of content (measured in
octets) that the server will deliver, 0 if unlimited. You need to specify the
option of prefetchSize, prefetchCount, prefetchGlobal at the same time | | int
+| *recoverFromDeclareException* (consumer) | Decides whether an exception
during declaration of exchanges or queues is recoverable or not. If the option
is false, camel will throw an exception when starting the consumer, which will
interrupt application startup (e.g. in the case when the exchange / queue is
already declared in RabbitMQ and has incompatible configuration). If set to
true, the consumer will try to reconnect periodically. | true | boolean
| *reQueue* (consumer) | This is used by the consumer to control rejection of
the message. When the consumer is complete processing the exchange, and if the
exchange failed, then the consumer is going to reject the message from the
RabbitMQ broker. If the header CamelRabbitmqRequeue is present then the value
of the header will be used, otherwise this endpoint value is used as fallback.
If the value is false (by default) then the message is discarded/dead-lettered.
If the value is true, t [...]
| *exceptionHandler* (consumer) | To let the consumer use a custom
ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this
option is not in use. By default the consumer will deal with exceptions, that
will be logged at WARN or ERROR level and ignored. | | ExceptionHandler
| *exchangePattern* (consumer) | Sets the exchange pattern when the consumer
creates an exchange. There are 3 enums and the value can be one of: InOnly,
InOut, InOptionalOut | | ExchangePattern