This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch rabbitmq in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/rabbitmq by this push: new 7bb0060 CAMEL-16003: camel-spring-rabbitmq - New component using spring client 7bb0060 is described below commit 7bb00604f85a32effb1d5d4008357d573f60d637 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Tue Jan 12 07:56:42 2021 +0100 CAMEL-16003: camel-spring-rabbitmq - New component using spring client --- .../apache/camel/catalog/components/rabbitmq.json | 2 +- .../camel/catalog/components/spring-rabbitmq.json | 17 ++++- .../catalog/docs/spring-rabbitmq-component.adoc | 9 ++- .../springrabbit/RabbitMQComponentConfigurer.java | 24 +++++++ .../springrabbit/RabbitMQEndpointConfigurer.java | 6 ++ .../springrabbit/RabbitMQEndpointUriFactory.java | 3 +- .../component/springrabbit/spring-rabbitmq.json | 5 ++ .../src/main/docs/spring-rabbitmq-component.adoc | 9 ++- .../DefaultListenerContainerFactory.java | 29 +++++++++ .../DefaultMessageListenerContainer.java | 16 +++++ .../springrabbit/ListenerContainerFactory.java | 33 ++++++++++ .../component/springrabbit/RabbitMQComponent.java | 62 ++++++++++++++++++ .../component/springrabbit/RabbitMQConsumer.java | 11 ++-- .../component/springrabbit/RabbitMQEndpoint.java | 45 +++++++------ .../src/test/resources/log4j2.properties | 6 +- .../dsl/SpringRabbitmqComponentBuilderFactory.java | 74 ++++++++++++++++++++++ .../dsl/RabbitMQEndpointBuilderFactory.java | 34 ++++++++++ .../ROOT/pages/spring-rabbitmq-component.adoc | 9 ++- 18 files changed, 355 insertions(+), 39 deletions(-) diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/rabbitmq.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/rabbitmq.json index 076c3bf..6bb1f1c 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/rabbitmq.json +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/rabbitmq.json @@ -128,6 +128,7 @@ "mandatory": { "kind": "parameter", "displayName": "Mandatory", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "This flag tells the server how to react if the message cannot be routed to a queue. If this flag is set, the server will return an unroutable message with a Return method. If this flag is zero, the server silently drops the [...] "publisherAcknowledgements": { "kind": "parameter", "displayName": "Publisher Acknowledgements", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "When true, the message will be published with publisher acknowledgements turned on" }, "publisherAcknowledgementsTimeout": { "kind": "parameter", "displayName": "Publisher Acknowledgements Timeout", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "description": "The amount of time in milliseconds to wait for a basic.ack response from RabbitMQ server" }, + "allowMessageBodySerialization": { "kind": "parameter", "displayName": "Allow Message Body Serialization", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether to allow Java serialization of the message body or not. If this value is true, the message body will be serialized on the producer side using Java serialization, if no type [...] "args": { "kind": "parameter", "displayName": "Args", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "java.util.Map<java.lang.String, java.lang.Object>", "prefix": "arg.", "multiValue": true, "deprecated": false, "autowired": false, "secret": false, "description": "Specify arguments for configuring the different RabbitMQ concepts, a different prefix is required for each: Exchange: arg.exchange. Queue: arg.queue. Binding: arg.binding. DLQ: a [...] "clientProperties": { "kind": "parameter", "displayName": "Client Properties", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "java.util.Map<java.lang.String, java.lang.Object>", "deprecated": false, "autowired": false, "secret": false, "description": "Connection client properties (client info used in negotiating with the server)" }, "connectionFactoryExceptionHandler": { "kind": "parameter", "displayName": "Connection Factory Exception Handler", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "com.rabbitmq.client.ExceptionHandler", "deprecated": false, "autowired": false, "secret": false, "description": "Custom rabbitmq ExceptionHandler for ConnectionFactory" }, @@ -141,7 +142,6 @@ "synchronous": { "kind": "parameter", "displayName": "Synchronous", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Sets whether synchronous processing should be strictly used, or Camel is allowed to use asynchronous processing (if supported)." }, "topologyRecoveryEnabled": { "kind": "parameter", "displayName": "Topology Recovery Enabled", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "description": "Enables connection topology recovery (should topology recovery be performed)" }, "transferException": { "kind": "parameter", "displayName": "Transfer Exception", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "When true and an inOut Exchange failed on the consumer side send the caused Exception back in the response" }, - "allowMessageBodySerialization": { "kind": "parameter", "displayName": "Allow Message Body Serialization", "group": "allowMessageBodySerialization", "label": "allowMessageBodySerialization", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether to allow Java serialization of the message body or not. If this value is true, the message body will be serialized on the producer [...] "password": { "kind": "parameter", "displayName": "Password", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": true, "defaultValue": "guest", "description": "Password for authenticated access" }, "sslProtocol": { "kind": "parameter", "displayName": "Ssl Protocol", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Enables SSL on connection, accepted value are true, TLS and 'SSLv3" }, "trustManager": { "kind": "parameter", "displayName": "Trust Manager", "group": "security", "label": "security", "required": false, "type": "object", "javaType": "javax.net.ssl.TrustManager", "deprecated": false, "autowired": false, "secret": false, "description": "Configure SSL trust manager, SSL should be enabled for this option to be effective" }, diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/spring-rabbitmq.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/spring-rabbitmq.json index be41f0f..61276f8 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/spring-rabbitmq.json +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/spring-rabbitmq.json @@ -22,10 +22,15 @@ "lenientProperties": false }, "componentProperties": { + "amqpAdmin": { "kind": "property", "displayName": "Amqp Admin", "group": "common", "label": "", "required": false, "type": "object", "javaType": "org.springframework.amqp.core.AmqpAdmin", "deprecated": false, "autowired": true, "secret": false, "description": "Optional AMQP Admin service to use for auto declaring elements (queues, exchanges, bindings)" }, "connectionFactory": { "kind": "property", "displayName": "Connection Factory", "group": "common", "label": "", "required": false, "type": "object", "javaType": "org.springframework.amqp.rabbit.connection.ConnectionFactory", "deprecated": false, "autowired": true, "secret": false, "description": "The connection factory to be use. A connection factory must be configured either on the component or endpoint." }, "testConnectionOnStartup": { "kind": "property", "displayName": "Test Connection On Startup", "group": "common", "label": "", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Specifies whether to test the connection on startup. This ensures that when Camel starts that all the JMS consumers have a valid connection to the JMS broker. If a connection cannot be granted then Camel [...] "autoStartup": { "kind": "property", "displayName": "Auto Startup", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Specifies whether the consumer container should auto-startup." }, "bridgeErrorHandler": { "kind": "property", "displayName": "Bridge Error Handler", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a me [...] + "deadLetterExchange": { "kind": "property", "displayName": "Dead Letter Exchange", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "The name of the dead letter exchange" }, + "deadLetterExchangeType": { "kind": "property", "displayName": "Dead Letter Exchange Type", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "enum": [ "direct", "fanout", "headers", "topic" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "direct", "description": "The type of the dead letter exchange" }, + "deadLetterQueue": { "kind": "property", "displayName": "Dead Letter Queue", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "The name of the dead letter queue" }, + "deadLetterRoutingKey": { "kind": "property", "displayName": "Dead Letter Routing Key", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "The routing key for the dead letter exchange" }, "lazyStartProducer": { "kind": "property", "displayName": "Lazy Start Producer", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during star [...] "autowiredEnabled": { "kind": "property", "displayName": "Autowired Enabled", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which t [...] "messageConverter": { "kind": "property", "displayName": "Message Converter", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.springframework.amqp.support.converter.MessageConverter", "deprecated": false, "autowired": false, "secret": false, "description": "To use a custom MessageConverter so you can be in control how to map to\/from a org.springframework.amqp.core.Message." }, @@ -33,17 +38,27 @@ "headerFilterStrategy": { "kind": "property", "displayName": "Header Filter Strategy", "group": "filter", "label": "filter", "required": false, "type": "object", "javaType": "org.apache.camel.spi.HeaderFilterStrategy", "deprecated": false, "autowired": false, "secret": false, "description": "To use a custom org.apache.camel.spi.HeaderFilterStrategy to filter header to and from Camel message." } }, "properties": { - "exchangeName": { "kind": "path", "displayName": "Exchange Name", "group": "common", "label": "", "required": true, "type": "string", "javaType": "java.lang.String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The exchange name determines the exchange to which the produced messages will be sent to. In the case of consumers, the exchange name determines the exchange the queue will be bound to." }, + "exchangeName": { "kind": "path", "displayName": "Exchange Name", "group": "common", "label": "", "required": true, "type": "string", "javaType": "java.lang.String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The exchange name determines the exchange to which the produced messages will be sent to. In the case of consumers, the exchange name determines the exchange the queue will be bound to. Note: to use default exchange then do n [...] "connectionFactory": { "kind": "parameter", "displayName": "Connection Factory", "group": "common", "label": "", "required": false, "type": "object", "javaType": "org.springframework.amqp.rabbit.connection.ConnectionFactory", "deprecated": false, "autowired": false, "secret": false, "description": "The connection factory to be use. A connection factory must be configured either on the component or endpoint." }, + "disableReplyTo": { "kind": "parameter", "displayName": "Disable Reply To", "group": "common", "label": "", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Specifies whether Camel ignores the ReplyTo header in messages. If true, Camel does not send a reply back to the destination specified in the ReplyTo header. You can use this option if you want Camel to consume from a rout [...] "routingKey": { "kind": "parameter", "displayName": "Routing Key", "group": "common", "label": "common", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Routing key." }, "testConnectionOnStartup": { "kind": "parameter", "displayName": "Test Connection On Startup", "group": "common", "label": "", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Specifies whether to test the connection on startup. This ensures that when Camel starts that all the JMS consumers have a valid connection to the JMS broker. If a connection cannot be granted then Camel [...] "asyncConsumer": { "kind": "parameter", "displayName": "Async Consumer", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the consumer processes the Exchange asynchronously. If enabled then the consumer may pickup the next message from the queue, while the previous message is being processed asynchronously (by the Asynchronous [...] + "autoDeclare": { "kind": "parameter", "displayName": "Auto Declare", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Specifies whether the consumer should auto declare binding between exchange, queue and routing key when starting." }, "autoStartup": { "kind": "parameter", "displayName": "Auto Startup", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Specifies whether the consumer container should auto-startup." }, "bridgeErrorHandler": { "kind": "parameter", "displayName": "Bridge Error Handler", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a m [...] + "deadLetterExchange": { "kind": "parameter", "displayName": "Dead Letter Exchange", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "The name of the dead letter exchange" }, + "deadLetterExchangeType": { "kind": "parameter", "displayName": "Dead Letter Exchange Type", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "enum": [ "direct", "fanout", "headers", "topic" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "direct", "description": "The type of the dead letter exchange" }, + "deadLetterQueue": { "kind": "parameter", "displayName": "Dead Letter Queue", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "The name of the dead letter queue" }, + "deadLetterRoutingKey": { "kind": "parameter", "displayName": "Dead Letter Routing Key", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "The routing key for the dead letter exchange" }, + "exchangeType": { "kind": "parameter", "displayName": "Exchange Type", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "enum": [ "direct", "fanout", "headers", "topic" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "direct", "description": "The type of the exchange" }, "queues": { "kind": "parameter", "displayName": "Queues", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The queue(s) to use for consuming messages. Multiple queue names can be separated by comma." }, "exceptionHandler": { "kind": "parameter", "displayName": "Exception Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", "deprecated": false, "autowired": false, "secret": false, "description": "To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the con [...] "exchangePattern": { "kind": "parameter", "displayName": "Exchange Pattern", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut", "InOptionalOut" ], "deprecated": false, "autowired": false, "secret": false, "description": "Sets the exchange pattern when the consumer creates an exchange." }, + "correlationKey": { "kind": "parameter", "displayName": "Correlation Key", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "To use a custom header for correlationId when doing request\/reply messaging." }, "lazyStartProducer": { "kind": "parameter", "displayName": "Lazy Start Producer", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during sta [...] + "replyTimeout": { "kind": "parameter", "displayName": "Reply Timeout", "group": "producer", "label": "producer", "required": false, "type": "duration", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "5000", "description": "Specify the timeout in milliseconds to be used when waiting for a reply message when doing request\/reply messaging. The default value is 5 seconds. A negative value indicates an indefinite timeout." }, + "args": { "kind": "parameter", "displayName": "Args", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "java.util.Map<java.lang.String, java.lang.Object>", "prefix": "arg.", "multiValue": true, "deprecated": false, "autowired": false, "secret": false, "description": "Specify arguments for configuring the different RabbitMQ concepts, a different prefix is required for each element: arg.exchange. arg.queue. arg.binding. arg.dlq.exchange. arg.dl [...] "messageConverter": { "kind": "parameter", "displayName": "Message Converter", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.springframework.amqp.support.converter.MessageConverter", "deprecated": false, "autowired": false, "secret": false, "description": "To use a custom MessageConverter so you can be in control how to map to\/from a org.springframework.amqp.core.Message." }, "messagePropertiesConverter": { "kind": "parameter", "displayName": "Message Properties Converter", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.springrabbit.MessagePropertiesConverter", "deprecated": false, "autowired": false, "secret": false, "description": "To use a custom MessagePropertiesConverter so you can be in control how to map to\/from a org.springframework.amqp.core.MessageProperties." }, "synchronous": { "kind": "parameter", "displayName": "Synchronous", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Sets whether synchronous processing should be strictly used, or Camel is allowed to use asynchronous processing (if supported)." }, diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/spring-rabbitmq-component.adoc b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/spring-rabbitmq-component.adoc index 75ee636..04b35cf 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/spring-rabbitmq-component.adoc +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/spring-rabbitmq-component.adoc @@ -43,7 +43,7 @@ determines the exchange the queue will be bound to. == Options // component options: START -The Spring RabbitMQ component supports 14 options, which are listed below. +The Spring RabbitMQ component supports 18 options, which are listed below. @@ -59,6 +59,10 @@ The Spring RabbitMQ component supports 14 options, which are listed below. | *deadLetterExchangeType* (consumer) | The type of the dead letter exchange. There are 4 enums and the value can be one of: direct, fanout, headers, topic | direct | String | *deadLetterQueue* (consumer) | The name of the dead letter queue | | String | *deadLetterRoutingKey* (consumer) | The routing key for the dead letter exchange | | String +| *errorHandler* (consumer) | To use a custom ErrorHandler for handling exceptions from the message listener (consumer) | | ErrorHandler +| *listenerContainerFactory* (consumer) | To use a custom factory for creating and configuring ListenerContainer to be used by the consumer for receiving messages | | ListenerContainerFactory +| *prefetchCount* (consumer) | Tell the broker how many messages to send to each consumer in a single request. Often this can be set quite high to improve throughput. | 250 | int +| *shutdownTimeout* (consumer) | The time to wait for workers in milliseconds after the container is stopped. If any workers are active when the shutdown signal comes they will be allowed to finish processing as long as they can finish within this timeout. | 5000 | long | *lazyStartProducer* (producer) | Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel's routing error handlers. Beware that when the first message is processed then creating and [...] | *autowiredEnabled* (advanced) | Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which then gets configured on the component. This can be used for automatic configuring JDBC data sources, JMS connection factories, AWS Clients, etc. | true | boolean | *messageConverter* (advanced) | To use a custom MessageConverter so you can be in control how to map to/from a org.springframework.amqp.core.Message. | | MessageConverter @@ -87,7 +91,7 @@ with the following path and query parameters: |=== -=== Query Parameters (22 parameters): +=== Query Parameters (23 parameters): [width="100%",cols="2,5,^1,2",options="header"] @@ -110,6 +114,7 @@ with the following path and query parameters: | *exceptionHandler* (consumer) | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the consumer will deal with exceptions, that will be logged at WARN or ERROR level and ignored. | | ExceptionHandler | *exchangePattern* (consumer) | Sets the exchange pattern when the consumer creates an exchange. There are 3 enums and the value can be one of: InOnly, InOut, InOptionalOut | | ExchangePattern | *lazyStartProducer* (producer) | Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel's routing error handlers. Beware that when the first message is processed then creating and [...] +| *replyTimeout* (producer) | Specify the timeout in milliseconds to be used when waiting for a reply message when doing request/reply messaging. The default value is 5 seconds. A negative value indicates an indefinite timeout. | 5000 | long | *args* (advanced) | Specify arguments for configuring the different RabbitMQ concepts, a different prefix is required for each element: arg.exchange. arg.queue. arg.binding. arg.dlq.exchange. arg.dlq.queue. arg.dlq.binding. For example to declare a queue with message ttl argument: args=arg.queue.x-message-ttl=60000 | | Map | *messageConverter* (advanced) | To use a custom MessageConverter so you can be in control how to map to/from a org.springframework.amqp.core.Message. | | MessageConverter | *messagePropertiesConverter* (advanced) | To use a custom MessagePropertiesConverter so you can be in control how to map to/from a org.springframework.amqp.core.MessageProperties. | | MessagePropertiesConverter diff --git a/components/camel-spring-rabbitmq/src/generated/java/org/apache/camel/component/springrabbit/RabbitMQComponentConfigurer.java b/components/camel-spring-rabbitmq/src/generated/java/org/apache/camel/component/springrabbit/RabbitMQComponentConfigurer.java index e564bc2..0e48117 100644 --- a/components/camel-spring-rabbitmq/src/generated/java/org/apache/camel/component/springrabbit/RabbitMQComponentConfigurer.java +++ b/components/camel-spring-rabbitmq/src/generated/java/org/apache/camel/component/springrabbit/RabbitMQComponentConfigurer.java @@ -39,14 +39,22 @@ public class RabbitMQComponentConfigurer extends PropertyConfigurerSupport imple case "deadLetterQueue": target.setDeadLetterQueue(property(camelContext, java.lang.String.class, value)); return true; case "deadletterroutingkey": case "deadLetterRoutingKey": target.setDeadLetterRoutingKey(property(camelContext, java.lang.String.class, value)); return true; + case "errorhandler": + case "errorHandler": target.setErrorHandler(property(camelContext, org.springframework.util.ErrorHandler.class, value)); return true; case "headerfilterstrategy": case "headerFilterStrategy": target.setHeaderFilterStrategy(property(camelContext, org.apache.camel.spi.HeaderFilterStrategy.class, value)); return true; case "lazystartproducer": case "lazyStartProducer": target.setLazyStartProducer(property(camelContext, boolean.class, value)); return true; + case "listenercontainerfactory": + case "listenerContainerFactory": target.setListenerContainerFactory(property(camelContext, org.apache.camel.component.springrabbit.ListenerContainerFactory.class, value)); return true; case "messageconverter": case "messageConverter": target.setMessageConverter(property(camelContext, org.springframework.amqp.support.converter.MessageConverter.class, value)); return true; case "messagepropertiesconverter": case "messagePropertiesConverter": target.setMessagePropertiesConverter(property(camelContext, org.apache.camel.component.springrabbit.MessagePropertiesConverter.class, value)); return true; + case "prefetchcount": + case "prefetchCount": target.setPrefetchCount(property(camelContext, int.class, value)); return true; + case "shutdowntimeout": + case "shutdownTimeout": target.setShutdownTimeout(property(camelContext, java.time.Duration.class, value).toMillis()); return true; case "testconnectiononstartup": case "testConnectionOnStartup": target.setTestConnectionOnStartup(property(camelContext, boolean.class, value)); return true; default: return false; @@ -79,14 +87,22 @@ public class RabbitMQComponentConfigurer extends PropertyConfigurerSupport imple case "deadLetterQueue": return java.lang.String.class; case "deadletterroutingkey": case "deadLetterRoutingKey": return java.lang.String.class; + case "errorhandler": + case "errorHandler": return org.springframework.util.ErrorHandler.class; case "headerfilterstrategy": case "headerFilterStrategy": return org.apache.camel.spi.HeaderFilterStrategy.class; case "lazystartproducer": case "lazyStartProducer": return boolean.class; + case "listenercontainerfactory": + case "listenerContainerFactory": return org.apache.camel.component.springrabbit.ListenerContainerFactory.class; case "messageconverter": case "messageConverter": return org.springframework.amqp.support.converter.MessageConverter.class; case "messagepropertiesconverter": case "messagePropertiesConverter": return org.apache.camel.component.springrabbit.MessagePropertiesConverter.class; + case "prefetchcount": + case "prefetchCount": return int.class; + case "shutdowntimeout": + case "shutdownTimeout": return long.class; case "testconnectiononstartup": case "testConnectionOnStartup": return boolean.class; default: return null; @@ -115,14 +131,22 @@ public class RabbitMQComponentConfigurer extends PropertyConfigurerSupport imple case "deadLetterQueue": return target.getDeadLetterQueue(); case "deadletterroutingkey": case "deadLetterRoutingKey": return target.getDeadLetterRoutingKey(); + case "errorhandler": + case "errorHandler": return target.getErrorHandler(); case "headerfilterstrategy": case "headerFilterStrategy": return target.getHeaderFilterStrategy(); case "lazystartproducer": case "lazyStartProducer": return target.isLazyStartProducer(); + case "listenercontainerfactory": + case "listenerContainerFactory": return target.getListenerContainerFactory(); case "messageconverter": case "messageConverter": return target.getMessageConverter(); case "messagepropertiesconverter": case "messagePropertiesConverter": return target.getMessagePropertiesConverter(); + case "prefetchcount": + case "prefetchCount": return target.getPrefetchCount(); + case "shutdowntimeout": + case "shutdownTimeout": return target.getShutdownTimeout(); case "testconnectiononstartup": case "testConnectionOnStartup": return target.isTestConnectionOnStartup(); default: return null; diff --git a/components/camel-spring-rabbitmq/src/generated/java/org/apache/camel/component/springrabbit/RabbitMQEndpointConfigurer.java b/components/camel-spring-rabbitmq/src/generated/java/org/apache/camel/component/springrabbit/RabbitMQEndpointConfigurer.java index c3c872c..50377af 100644 --- a/components/camel-spring-rabbitmq/src/generated/java/org/apache/camel/component/springrabbit/RabbitMQEndpointConfigurer.java +++ b/components/camel-spring-rabbitmq/src/generated/java/org/apache/camel/component/springrabbit/RabbitMQEndpointConfigurer.java @@ -55,6 +55,8 @@ public class RabbitMQEndpointConfigurer extends PropertyConfigurerSupport implem case "messagepropertiesconverter": case "messagePropertiesConverter": target.setMessagePropertiesConverter(property(camelContext, org.apache.camel.component.springrabbit.MessagePropertiesConverter.class, value)); return true; case "queues": target.setQueues(property(camelContext, java.lang.String.class, value)); return true; + case "replytimeout": + case "replyTimeout": target.setReplyTimeout(property(camelContext, java.time.Duration.class, value).toMillis()); return true; case "routingkey": case "routingKey": target.setRoutingKey(property(camelContext, java.lang.String.class, value)); return true; case "synchronous": target.setSynchronous(property(camelContext, boolean.class, value)); return true; @@ -102,6 +104,8 @@ public class RabbitMQEndpointConfigurer extends PropertyConfigurerSupport implem case "messagepropertiesconverter": case "messagePropertiesConverter": return org.apache.camel.component.springrabbit.MessagePropertiesConverter.class; case "queues": return java.lang.String.class; + case "replytimeout": + case "replyTimeout": return long.class; case "routingkey": case "routingKey": return java.lang.String.class; case "synchronous": return boolean.class; @@ -150,6 +154,8 @@ public class RabbitMQEndpointConfigurer extends PropertyConfigurerSupport implem case "messagepropertiesconverter": case "messagePropertiesConverter": return target.getMessagePropertiesConverter(); case "queues": return target.getQueues(); + case "replytimeout": + case "replyTimeout": return target.getReplyTimeout(); case "routingkey": case "routingKey": return target.getRoutingKey(); case "synchronous": return target.isSynchronous(); diff --git a/components/camel-spring-rabbitmq/src/generated/java/org/apache/camel/component/springrabbit/RabbitMQEndpointUriFactory.java b/components/camel-spring-rabbitmq/src/generated/java/org/apache/camel/component/springrabbit/RabbitMQEndpointUriFactory.java index ff14ae6..6707b6f 100644 --- a/components/camel-spring-rabbitmq/src/generated/java/org/apache/camel/component/springrabbit/RabbitMQEndpointUriFactory.java +++ b/components/camel-spring-rabbitmq/src/generated/java/org/apache/camel/component/springrabbit/RabbitMQEndpointUriFactory.java @@ -20,7 +20,7 @@ public class RabbitMQEndpointUriFactory extends org.apache.camel.support.compone private static final Set<String> PROPERTY_NAMES; private static final Set<String> SECRET_PROPERTY_NAMES; static { - Set<String> props = new HashSet<>(23); + Set<String> props = new HashSet<>(24); props.add("disableReplyTo"); props.add("asyncConsumer"); props.add("deadLetterExchange"); @@ -41,6 +41,7 @@ public class RabbitMQEndpointUriFactory extends org.apache.camel.support.compone props.add("queues"); props.add("messagePropertiesConverter"); props.add("exchangeName"); + props.add("replyTimeout"); props.add("exceptionHandler"); props.add("routingKey"); props.add("autoDeclare"); diff --git a/components/camel-spring-rabbitmq/src/generated/resources/org/apache/camel/component/springrabbit/spring-rabbitmq.json b/components/camel-spring-rabbitmq/src/generated/resources/org/apache/camel/component/springrabbit/spring-rabbitmq.json index 98ac6aa..6eed0e3 100644 --- a/components/camel-spring-rabbitmq/src/generated/resources/org/apache/camel/component/springrabbit/spring-rabbitmq.json +++ b/components/camel-spring-rabbitmq/src/generated/resources/org/apache/camel/component/springrabbit/spring-rabbitmq.json @@ -31,6 +31,10 @@ "deadLetterExchangeType": { "kind": "property", "displayName": "Dead Letter Exchange Type", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "enum": [ "direct", "fanout", "headers", "topic" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "direct", "description": "The type of the dead letter exchange" }, "deadLetterQueue": { "kind": "property", "displayName": "Dead Letter Queue", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "The name of the dead letter queue" }, "deadLetterRoutingKey": { "kind": "property", "displayName": "Dead Letter Routing Key", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "The routing key for the dead letter exchange" }, + "errorHandler": { "kind": "property", "displayName": "Error Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.springframework.util.ErrorHandler", "deprecated": false, "autowired": false, "secret": false, "description": "To use a custom ErrorHandler for handling exceptions from the message listener (consumer)" }, + "listenerContainerFactory": { "kind": "property", "displayName": "Listener Container Factory", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.springrabbit.ListenerContainerFactory", "deprecated": false, "autowired": false, "secret": false, "description": "To use a custom factory for creating and configuring ListenerContainer to be used by the consumer for receiving messages" }, + "prefetchCount": { "kind": "property", "displayName": "Prefetch Count", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 250, "description": "Tell the broker how many messages to send to each consumer in a single request. Often this can be set quite high to improve throughput." }, + "shutdownTimeout": { "kind": "property", "displayName": "Shutdown Timeout", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "duration", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "5000", "description": "The time to wait for workers in milliseconds after the container is stopped. If any workers are active when the shutdown signal comes they will be allowed to finish processing as long as they c [...] "lazyStartProducer": { "kind": "property", "displayName": "Lazy Start Producer", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during star [...] "autowiredEnabled": { "kind": "property", "displayName": "Autowired Enabled", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which t [...] "messageConverter": { "kind": "property", "displayName": "Message Converter", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.springframework.amqp.support.converter.MessageConverter", "deprecated": false, "autowired": false, "secret": false, "description": "To use a custom MessageConverter so you can be in control how to map to\/from a org.springframework.amqp.core.Message." }, @@ -56,6 +60,7 @@ "exceptionHandler": { "kind": "parameter", "displayName": "Exception Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", "deprecated": false, "autowired": false, "secret": false, "description": "To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the con [...] "exchangePattern": { "kind": "parameter", "displayName": "Exchange Pattern", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut", "InOptionalOut" ], "deprecated": false, "autowired": false, "secret": false, "description": "Sets the exchange pattern when the consumer creates an exchange." }, "lazyStartProducer": { "kind": "parameter", "displayName": "Lazy Start Producer", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during sta [...] + "replyTimeout": { "kind": "parameter", "displayName": "Reply Timeout", "group": "producer", "label": "producer", "required": false, "type": "duration", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "5000", "description": "Specify the timeout in milliseconds to be used when waiting for a reply message when doing request\/reply messaging. The default value is 5 seconds. A negative value indicates an indefinite timeout." }, "args": { "kind": "parameter", "displayName": "Args", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "java.util.Map<java.lang.String, java.lang.Object>", "prefix": "arg.", "multiValue": true, "deprecated": false, "autowired": false, "secret": false, "description": "Specify arguments for configuring the different RabbitMQ concepts, a different prefix is required for each element: arg.exchange. arg.queue. arg.binding. arg.dlq.exchange. arg.dl [...] "messageConverter": { "kind": "parameter", "displayName": "Message Converter", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.springframework.amqp.support.converter.MessageConverter", "deprecated": false, "autowired": false, "secret": false, "description": "To use a custom MessageConverter so you can be in control how to map to\/from a org.springframework.amqp.core.Message." }, "messagePropertiesConverter": { "kind": "parameter", "displayName": "Message Properties Converter", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.springrabbit.MessagePropertiesConverter", "deprecated": false, "autowired": false, "secret": false, "description": "To use a custom MessagePropertiesConverter so you can be in control how to map to\/from a org.springframework.amqp.core.MessageProperties." }, diff --git a/components/camel-spring-rabbitmq/src/main/docs/spring-rabbitmq-component.adoc b/components/camel-spring-rabbitmq/src/main/docs/spring-rabbitmq-component.adoc index 75ee636..04b35cf 100644 --- a/components/camel-spring-rabbitmq/src/main/docs/spring-rabbitmq-component.adoc +++ b/components/camel-spring-rabbitmq/src/main/docs/spring-rabbitmq-component.adoc @@ -43,7 +43,7 @@ determines the exchange the queue will be bound to. == Options // component options: START -The Spring RabbitMQ component supports 14 options, which are listed below. +The Spring RabbitMQ component supports 18 options, which are listed below. @@ -59,6 +59,10 @@ The Spring RabbitMQ component supports 14 options, which are listed below. | *deadLetterExchangeType* (consumer) | The type of the dead letter exchange. There are 4 enums and the value can be one of: direct, fanout, headers, topic | direct | String | *deadLetterQueue* (consumer) | The name of the dead letter queue | | String | *deadLetterRoutingKey* (consumer) | The routing key for the dead letter exchange | | String +| *errorHandler* (consumer) | To use a custom ErrorHandler for handling exceptions from the message listener (consumer) | | ErrorHandler +| *listenerContainerFactory* (consumer) | To use a custom factory for creating and configuring ListenerContainer to be used by the consumer for receiving messages | | ListenerContainerFactory +| *prefetchCount* (consumer) | Tell the broker how many messages to send to each consumer in a single request. Often this can be set quite high to improve throughput. | 250 | int +| *shutdownTimeout* (consumer) | The time to wait for workers in milliseconds after the container is stopped. If any workers are active when the shutdown signal comes they will be allowed to finish processing as long as they can finish within this timeout. | 5000 | long | *lazyStartProducer* (producer) | Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel's routing error handlers. Beware that when the first message is processed then creating and [...] | *autowiredEnabled* (advanced) | Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which then gets configured on the component. This can be used for automatic configuring JDBC data sources, JMS connection factories, AWS Clients, etc. | true | boolean | *messageConverter* (advanced) | To use a custom MessageConverter so you can be in control how to map to/from a org.springframework.amqp.core.Message. | | MessageConverter @@ -87,7 +91,7 @@ with the following path and query parameters: |=== -=== Query Parameters (22 parameters): +=== Query Parameters (23 parameters): [width="100%",cols="2,5,^1,2",options="header"] @@ -110,6 +114,7 @@ with the following path and query parameters: | *exceptionHandler* (consumer) | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the consumer will deal with exceptions, that will be logged at WARN or ERROR level and ignored. | | ExceptionHandler | *exchangePattern* (consumer) | Sets the exchange pattern when the consumer creates an exchange. There are 3 enums and the value can be one of: InOnly, InOut, InOptionalOut | | ExchangePattern | *lazyStartProducer* (producer) | Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel's routing error handlers. Beware that when the first message is processed then creating and [...] +| *replyTimeout* (producer) | Specify the timeout in milliseconds to be used when waiting for a reply message when doing request/reply messaging. The default value is 5 seconds. A negative value indicates an indefinite timeout. | 5000 | long | *args* (advanced) | Specify arguments for configuring the different RabbitMQ concepts, a different prefix is required for each element: arg.exchange. arg.queue. arg.binding. arg.dlq.exchange. arg.dlq.queue. arg.dlq.binding. For example to declare a queue with message ttl argument: args=arg.queue.x-message-ttl=60000 | | Map | *messageConverter* (advanced) | To use a custom MessageConverter so you can be in control how to map to/from a org.springframework.amqp.core.Message. | | MessageConverter | *messagePropertiesConverter* (advanced) | To use a custom MessagePropertiesConverter so you can be in control how to map to/from a org.springframework.amqp.core.MessageProperties. | | MessagePropertiesConverter diff --git a/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/DefaultListenerContainerFactory.java b/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/DefaultListenerContainerFactory.java new file mode 100644 index 0000000..e23b599 --- /dev/null +++ b/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/DefaultListenerContainerFactory.java @@ -0,0 +1,29 @@ +package org.apache.camel.component.springrabbit; + +import org.springframework.amqp.core.AmqpAdmin; +import org.springframework.amqp.rabbit.core.RabbitAdmin; +import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer; + +public class DefaultListenerContainerFactory implements ListenerContainerFactory { + + @Override + public AbstractMessageListenerContainer createListenerContainer(RabbitMQEndpoint endpoint) { + DefaultMessageListenerContainer listener = new DefaultMessageListenerContainer(endpoint.getConnectionFactory()); + if (endpoint.getQueues() != null) { + listener.setQueueNames(endpoint.getQueues().split(",")); + } + + AmqpAdmin admin = endpoint.getComponent().getAmqpAdmin(); + if (endpoint.isAutoDeclare() && admin == null) { + admin = new RabbitAdmin(endpoint.getConnectionFactory()); + } + listener.setAutoDeclare(endpoint.isAutoDeclare()); + listener.setAmqpAdmin(admin); + if (endpoint.getComponent().getErrorHandler() != null) { + listener.setErrorHandler(endpoint.getComponent().getErrorHandler()); + } + listener.setPrefetchCount(endpoint.getComponent().getPrefetchCount()); + listener.setShutdownTimeout(endpoint.getComponent().getShutdownTimeout()); + return listener; + } +} diff --git a/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/DefaultMessageListenerContainer.java b/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/DefaultMessageListenerContainer.java index ed82e79..d0ed8ed 100644 --- a/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/DefaultMessageListenerContainer.java +++ b/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/DefaultMessageListenerContainer.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.camel.component.springrabbit; import org.springframework.amqp.core.AmqpAdmin; diff --git a/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/ListenerContainerFactory.java b/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/ListenerContainerFactory.java new file mode 100644 index 0000000..a9e94c7 --- /dev/null +++ b/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/ListenerContainerFactory.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.springrabbit; + +import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer; + +/** + * Factory to create {@link AbstractMessageListenerContainer} + */ +public interface ListenerContainerFactory { + + /** + * Creates the listener container to use for the consumer. + * + * @param endpoint the endpoint + * @return the created and configured listener container + */ + AbstractMessageListenerContainer createListenerContainer(RabbitMQEndpoint endpoint); +} diff --git a/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/RabbitMQComponent.java b/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/RabbitMQComponent.java index 097f28c6c..2d0584b 100644 --- a/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/RabbitMQComponent.java +++ b/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/RabbitMQComponent.java @@ -20,11 +20,16 @@ import java.util.Map; import org.apache.camel.Endpoint; import org.apache.camel.spi.Metadata; +import org.apache.camel.spi.UriParam; import org.apache.camel.spi.annotations.Component; import org.apache.camel.support.HeaderFilterStrategyComponent; import org.springframework.amqp.core.AmqpAdmin; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.support.converter.MessageConverter; +import org.springframework.util.ErrorHandler; + +import static org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.DEFAULT_PREFETCH_COUNT; +import static org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.DEFAULT_SHUTDOWN_TIMEOUT; @Component("spring-rabbitmq") public class RabbitMQComponent extends HeaderFilterStrategyComponent { @@ -50,6 +55,10 @@ public class RabbitMQComponent extends HeaderFilterStrategyComponent { @Metadata(label = "advanced", description = "To use a custom MessagePropertiesConverter so you can be in control how to map to/from a org.springframework.amqp.core.MessageProperties.") private MessagePropertiesConverter messagePropertiesConverter; + @UriParam(label = "producer", javaType = "java.time.Duration", defaultValue = "5000", + description = "Specify the timeout in milliseconds to be used when waiting for a reply message when doing request/reply messaging." + + " The default value is 5 seconds. A negative value indicates an indefinite timeout.") + private long replyTimeout = 5000; @Metadata(label = "consumer", description = "The name of the dead letter exchange") private String deadLetterExchange; @Metadata(label = "consumer", description = "The name of the dead letter queue") @@ -59,6 +68,18 @@ public class RabbitMQComponent extends HeaderFilterStrategyComponent { @Metadata(label = "consumer", defaultValue = "direct", enums = "direct,fanout,headers,topic", description = "The type of the dead letter exchange") private String deadLetterExchangeType = "direct"; + @Metadata(label = "consumer,advanced", + description = "To use a custom ErrorHandler for handling exceptions from the message listener (consumer)") + private ErrorHandler errorHandler; + @Metadata(label = "consumer,advanced", defaultValue = "" + DEFAULT_PREFETCH_COUNT, + description = "Tell the broker how many messages to send to each consumer in a single request. Often this can be set quite high to improve throughput.") + private int prefetchCount = DEFAULT_PREFETCH_COUNT; + @Metadata(label = "consumer,advanced", javaType = "java.time.Duration", defaultValue = "" + DEFAULT_SHUTDOWN_TIMEOUT, + description = "The time to wait for workers in milliseconds after the container is stopped. If any workers are active when the shutdown signal comes they will be allowed to finish processing as long as they can finish within this timeout.") + private long shutdownTimeout = DEFAULT_SHUTDOWN_TIMEOUT; + @Metadata(label = "consumer,advanced", + description = "To use a custom factory for creating and configuring ListenerContainer to be used by the consumer for receiving messages") + private ListenerContainerFactory listenerContainerFactory = new DefaultListenerContainerFactory(); @Override protected void doInit() throws Exception { @@ -87,6 +108,7 @@ public class RabbitMQComponent extends HeaderFilterStrategyComponent { endpoint.setDeadLetterExchangeType(deadLetterExchangeType); endpoint.setDeadLetterQueue(deadLetterQueue); endpoint.setDeadLetterRoutingKey(deadLetterRoutingKey); + endpoint.setReplyTimeout(replyTimeout); setProperties(endpoint, parameters); return endpoint; @@ -171,4 +193,44 @@ public class RabbitMQComponent extends HeaderFilterStrategyComponent { public void setDeadLetterExchangeType(String deadLetterExchangeType) { this.deadLetterExchangeType = deadLetterExchangeType; } + + public ErrorHandler getErrorHandler() { + return errorHandler; + } + + public void setErrorHandler(ErrorHandler errorHandler) { + this.errorHandler = errorHandler; + } + + public int getPrefetchCount() { + return prefetchCount; + } + + public void setPrefetchCount(int prefetchCount) { + this.prefetchCount = prefetchCount; + } + + public long getReplyTimeout() { + return replyTimeout; + } + + public void setReplyTimeout(long replyTimeout) { + this.replyTimeout = replyTimeout; + } + + public long getShutdownTimeout() { + return shutdownTimeout; + } + + public void setShutdownTimeout(long shutdownTimeout) { + this.shutdownTimeout = shutdownTimeout; + } + + public ListenerContainerFactory getListenerContainerFactory() { + return listenerContainerFactory; + } + + public void setListenerContainerFactory(ListenerContainerFactory listenerContainerFactory) { + this.listenerContainerFactory = listenerContainerFactory; + } } diff --git a/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/RabbitMQConsumer.java b/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/RabbitMQConsumer.java index 37c3fbe..0a476f4 100644 --- a/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/RabbitMQConsumer.java +++ b/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/RabbitMQConsumer.java @@ -25,18 +25,17 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.connection.Connection; import org.springframework.amqp.rabbit.connection.RabbitUtils; +import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer; public class RabbitMQConsumer extends DefaultConsumer implements Suspendable { - // TODO: auto create/binding queue via AmqpAdmin - private static final Logger LOG = LoggerFactory.getLogger(RabbitMQConsumer.class); - private DefaultMessageListenerContainer listenerContainer; + private AbstractMessageListenerContainer listenerContainer; private volatile EndpointMessageListener messageListener; private volatile boolean initialized; - public RabbitMQConsumer(Endpoint endpoint, Processor processor, DefaultMessageListenerContainer listenerContainer) { + public RabbitMQConsumer(Endpoint endpoint, Processor processor, AbstractMessageListenerContainer listenerContainer) { super(endpoint, processor); this.listenerContainer = listenerContainer; this.listenerContainer.setMessageListener(getEndpointMessageListener()); @@ -56,9 +55,7 @@ public class RabbitMQConsumer extends DefaultConsumer implements Suspendable { protected void createMessageListener(RabbitMQEndpoint endpoint, Processor processor) { messageListener = new EndpointMessageListener(endpoint, processor); - getEndpoint().configureMessageListener(messageListener); - messageListener.setAsync(endpoint.isAsyncConsumer()); - messageListener.setDisableReplyTo(endpoint.isDisableReplyTo()); + endpoint.configureMessageListener(messageListener); } /** diff --git a/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/RabbitMQEndpoint.java b/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/RabbitMQEndpoint.java index 89cad7c..7eedfe7 100644 --- a/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/RabbitMQEndpoint.java +++ b/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/RabbitMQEndpoint.java @@ -41,8 +41,8 @@ import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.QueueBuilder; import org.springframework.amqp.rabbit.AsyncRabbitTemplate; import org.springframework.amqp.rabbit.connection.ConnectionFactory; -import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer; import org.springframework.amqp.support.converter.MessageConverter; /** @@ -129,6 +129,10 @@ public class RabbitMQEndpoint extends DefaultEndpoint implements AsyncEndpoint { + " handles the reply message. You can also use this option if you want to use Camel as a proxy between different" + " message brokers and you want to route message from one system to another.") private boolean disableReplyTo; + @UriParam(label = "producer", javaType = "java.time.Duration", defaultValue = "5000", + description = "Specify the timeout in milliseconds to be used when waiting for a reply message when doing request/reply messaging." + + " The default value is 5 seconds. A negative value indicates an indefinite timeout.") + private long replyTimeout = 5000; public RabbitMQEndpoint(String endpointUri, Component component, String exchangeName) { super(endpointUri, component); @@ -280,9 +284,17 @@ public class RabbitMQEndpoint extends DefaultEndpoint implements AsyncEndpoint { this.disableReplyTo = disableReplyTo; } + public long getReplyTimeout() { + return replyTimeout; + } + + public void setReplyTimeout(long replyTimeout) { + this.replyTimeout = replyTimeout; + } + @Override public Consumer createConsumer(Processor processor) throws Exception { - DefaultMessageListenerContainer listenerContainer = createMessageListenerContainer(); + AbstractMessageListenerContainer listenerContainer = createMessageListenerContainer(); RabbitMQConsumer consumer = new RabbitMQConsumer(this, processor, listenerContainer); configureConsumer(consumer); return consumer; @@ -346,27 +358,18 @@ public class RabbitMQEndpoint extends DefaultEndpoint implements AsyncEndpoint { */ public AsyncRabbitTemplate createInOutTemplate() { RabbitTemplate template = new RabbitTemplate(getConnectionFactory()); - template.setRoutingKey(getRoutingKey()); + template.setRoutingKey(routingKey); + template.setReplyTimeout(replyTimeout); return new AsyncRabbitTemplate(template); } - public DefaultMessageListenerContainer createMessageListenerContainer() throws Exception { - DefaultMessageListenerContainer listener = new DefaultMessageListenerContainer(getConnectionFactory()); - if (getQueues() != null) { - listener.setQueueNames(getQueues().split(",")); - } - - AmqpAdmin admin = getComponent().getAmqpAdmin(); - if (autoDeclare && admin == null) { - admin = new RabbitAdmin(getConnectionFactory()); - } - listener.setAutoDeclare(autoDeclare); - listener.setAmqpAdmin(admin); - return listener; + public AbstractMessageListenerContainer createMessageListenerContainer() throws Exception { + return getComponent().getListenerContainerFactory().createListenerContainer(this); } public void configureMessageListener(EndpointMessageListener listener) { - // TODO: any endpoint options to configure + listener.setAsync(isAsyncConsumer()); + listener.setDisableReplyTo(isDisableReplyTo()); } protected boolean parseArgsBoolean(Map<String, Object> args, String key, String defaultValue) { @@ -381,10 +384,12 @@ public class RabbitMQEndpoint extends DefaultEndpoint implements AsyncEndpoint { } } - public void declareElements(DefaultMessageListenerContainer container) { - AmqpAdmin admin = container.getAmqpAdmin(); + public void declareElements(AbstractMessageListenerContainer container) { + AmqpAdmin admin = null; + if (container instanceof DefaultMessageListenerContainer) { + admin = ((DefaultMessageListenerContainer) container).getAmqpAdmin(); + } if (admin != null && autoDeclare) { - // bind dead letter exchange if (deadLetterExchange != null) { ExchangeBuilder eb = new ExchangeBuilder(deadLetterExchange, deadLetterExchangeType); diff --git a/components/camel-spring-rabbitmq/src/test/resources/log4j2.properties b/components/camel-spring-rabbitmq/src/test/resources/log4j2.properties index 65dda4d..a9d4d76 100644 --- a/components/camel-spring-rabbitmq/src/test/resources/log4j2.properties +++ b/components/camel-spring-rabbitmq/src/test/resources/log4j2.properties @@ -26,7 +26,7 @@ appender.out.layout.type = PatternLayout appender.out.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n rootLogger.level = INFO rootLogger.appenderRef.file.ref = file -#rootLogger.appenderRef.out.ref = out +rootLogger.appenderRef.out.ref = out -#logger.rabbit.name = org.springframework.amqp.rabbit -#logger.rabbit.level = DEBUG \ No newline at end of file +logger.rabbit.name = org.springframework.amqp.rabbit +logger.rabbit.level = DEBUG \ No newline at end of file diff --git a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/SpringRabbitmqComponentBuilderFactory.java b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/SpringRabbitmqComponentBuilderFactory.java index 3ccdda0..7ab6d93 100644 --- a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/SpringRabbitmqComponentBuilderFactory.java +++ b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/SpringRabbitmqComponentBuilderFactory.java @@ -204,6 +204,76 @@ public interface SpringRabbitmqComponentBuilderFactory { return this; } /** + * To use a custom ErrorHandler for handling exceptions from the message + * listener (consumer). + * + * The option is a: + * <code>org.springframework.util.ErrorHandler</code> type. + * + * Group: consumer (advanced) + * + * @param errorHandler the value to set + * @return the dsl builder + */ + default SpringRabbitmqComponentBuilder errorHandler( + org.springframework.util.ErrorHandler errorHandler) { + doSetProperty("errorHandler", errorHandler); + return this; + } + /** + * To use a custom factory for creating and configuring + * ListenerContainer to be used by the consumer for receiving messages. + * + * The option is a: + * <code>org.apache.camel.component.springrabbit.ListenerContainerFactory</code> type. + * + * Group: consumer (advanced) + * + * @param listenerContainerFactory the value to set + * @return the dsl builder + */ + default SpringRabbitmqComponentBuilder listenerContainerFactory( + org.apache.camel.component.springrabbit.ListenerContainerFactory listenerContainerFactory) { + doSetProperty("listenerContainerFactory", listenerContainerFactory); + return this; + } + /** + * Tell the broker how many messages to send to each consumer in a + * single request. Often this can be set quite high to improve + * throughput. + * + * The option is a: <code>int</code> type. + * + * Default: 250 + * Group: consumer (advanced) + * + * @param prefetchCount the value to set + * @return the dsl builder + */ + default SpringRabbitmqComponentBuilder prefetchCount(int prefetchCount) { + doSetProperty("prefetchCount", prefetchCount); + return this; + } + /** + * The time to wait for workers in milliseconds after the container is + * stopped. If any workers are active when the shutdown signal comes + * they will be allowed to finish processing as long as they can finish + * within this timeout. + * + * The option is a: <code>long</code> type. + * + * Default: 5000 + * Group: consumer (advanced) + * + * @param shutdownTimeout the value to set + * @return the dsl builder + */ + default SpringRabbitmqComponentBuilder shutdownTimeout( + long shutdownTimeout) { + doSetProperty("shutdownTimeout", shutdownTimeout); + return this; + } + /** * Whether the producer should be started lazy (on the first message). * By starting lazy you can use this to allow CamelContext and routes to * startup in situations where a producer may otherwise fail during @@ -326,6 +396,10 @@ public interface SpringRabbitmqComponentBuilderFactory { case "deadLetterExchangeType": ((RabbitMQComponent) component).setDeadLetterExchangeType((java.lang.String) value); return true; case "deadLetterQueue": ((RabbitMQComponent) component).setDeadLetterQueue((java.lang.String) value); return true; case "deadLetterRoutingKey": ((RabbitMQComponent) component).setDeadLetterRoutingKey((java.lang.String) value); return true; + case "errorHandler": ((RabbitMQComponent) component).setErrorHandler((org.springframework.util.ErrorHandler) value); return true; + case "listenerContainerFactory": ((RabbitMQComponent) component).setListenerContainerFactory((org.apache.camel.component.springrabbit.ListenerContainerFactory) value); return true; + case "prefetchCount": ((RabbitMQComponent) component).setPrefetchCount((int) value); return true; + case "shutdownTimeout": ((RabbitMQComponent) component).setShutdownTimeout((long) value); return true; case "lazyStartProducer": ((RabbitMQComponent) component).setLazyStartProducer((boolean) value); return true; case "autowiredEnabled": ((RabbitMQComponent) component).setAutowiredEnabled((boolean) value); return true; case "messageConverter": ((RabbitMQComponent) component).setMessageConverter((org.springframework.amqp.support.converter.MessageConverter) value); return true; diff --git a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/RabbitMQEndpointBuilderFactory.java b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/RabbitMQEndpointBuilderFactory.java index f27a334..eb19436 100644 --- a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/RabbitMQEndpointBuilderFactory.java +++ b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/RabbitMQEndpointBuilderFactory.java @@ -883,6 +883,40 @@ public interface RabbitMQEndpointBuilderFactory { return this; } /** + * Specify the timeout in milliseconds to be used when waiting for a + * reply message when doing request/reply messaging. The default value + * is 5 seconds. A negative value indicates an indefinite timeout. + * + * The option is a: <code>long</code> type. + * + * Default: 5000 + * Group: producer + * + * @param replyTimeout the value to set + * @return the dsl builder + */ + default RabbitMQEndpointProducerBuilder replyTimeout(long replyTimeout) { + doSetProperty("replyTimeout", replyTimeout); + return this; + } + /** + * Specify the timeout in milliseconds to be used when waiting for a + * reply message when doing request/reply messaging. The default value + * is 5 seconds. A negative value indicates an indefinite timeout. + * + * The option will be converted to a <code>long</code> type. + * + * Default: 5000 + * Group: producer + * + * @param replyTimeout the value to set + * @return the dsl builder + */ + default RabbitMQEndpointProducerBuilder replyTimeout(String replyTimeout) { + doSetProperty("replyTimeout", replyTimeout); + return this; + } + /** * Specifies whether to use transacted mode. * * The option is a: <code>boolean</code> type. diff --git a/docs/components/modules/ROOT/pages/spring-rabbitmq-component.adoc b/docs/components/modules/ROOT/pages/spring-rabbitmq-component.adoc index d8d3a04..1f36e4a 100644 --- a/docs/components/modules/ROOT/pages/spring-rabbitmq-component.adoc +++ b/docs/components/modules/ROOT/pages/spring-rabbitmq-component.adoc @@ -45,7 +45,7 @@ determines the exchange the queue will be bound to. == Options // component options: START -The Spring RabbitMQ component supports 14 options, which are listed below. +The Spring RabbitMQ component supports 18 options, which are listed below. @@ -61,6 +61,10 @@ The Spring RabbitMQ component supports 14 options, which are listed below. | *deadLetterExchangeType* (consumer) | The type of the dead letter exchange. There are 4 enums and the value can be one of: direct, fanout, headers, topic | direct | String | *deadLetterQueue* (consumer) | The name of the dead letter queue | | String | *deadLetterRoutingKey* (consumer) | The routing key for the dead letter exchange | | String +| *errorHandler* (consumer) | To use a custom ErrorHandler for handling exceptions from the message listener (consumer) | | ErrorHandler +| *listenerContainerFactory* (consumer) | To use a custom factory for creating and configuring ListenerContainer to be used by the consumer for receiving messages | | ListenerContainerFactory +| *prefetchCount* (consumer) | Tell the broker how many messages to send to each consumer in a single request. Often this can be set quite high to improve throughput. | 250 | int +| *shutdownTimeout* (consumer) | The time to wait for workers in milliseconds after the container is stopped. If any workers are active when the shutdown signal comes they will be allowed to finish processing as long as they can finish within this timeout. | 5000 | long | *lazyStartProducer* (producer) | Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel's routing error handlers. Beware that when the first message is processed then creating and [...] | *autowiredEnabled* (advanced) | Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which then gets configured on the component. This can be used for automatic configuring JDBC data sources, JMS connection factories, AWS Clients, etc. | true | boolean | *messageConverter* (advanced) | To use a custom MessageConverter so you can be in control how to map to/from a org.springframework.amqp.core.Message. | | MessageConverter @@ -89,7 +93,7 @@ with the following path and query parameters: |=== -=== Query Parameters (22 parameters): +=== Query Parameters (23 parameters): [width="100%",cols="2,5,^1,2",options="header"] @@ -112,6 +116,7 @@ with the following path and query parameters: | *exceptionHandler* (consumer) | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the consumer will deal with exceptions, that will be logged at WARN or ERROR level and ignored. | | ExceptionHandler | *exchangePattern* (consumer) | Sets the exchange pattern when the consumer creates an exchange. There are 3 enums and the value can be one of: InOnly, InOut, InOptionalOut | | ExchangePattern | *lazyStartProducer* (producer) | Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel's routing error handlers. Beware that when the first message is processed then creating and [...] +| *replyTimeout* (producer) | Specify the timeout in milliseconds to be used when waiting for a reply message when doing request/reply messaging. The default value is 5 seconds. A negative value indicates an indefinite timeout. | 5000 | long | *args* (advanced) | Specify arguments for configuring the different RabbitMQ concepts, a different prefix is required for each element: arg.exchange. arg.queue. arg.binding. arg.dlq.exchange. arg.dlq.queue. arg.dlq.binding. For example to declare a queue with message ttl argument: args=arg.queue.x-message-ttl=60000 | | Map | *messageConverter* (advanced) | To use a custom MessageConverter so you can be in control how to map to/from a org.springframework.amqp.core.Message. | | MessageConverter | *messagePropertiesConverter* (advanced) | To use a custom MessagePropertiesConverter so you can be in control how to map to/from a org.springframework.amqp.core.MessageProperties. | | MessagePropertiesConverter