Repository: camel Updated Branches: refs/heads/master 2dfef38db -> 6e936bb7b
CAMEL-11666 - Camel Hazelcast Queue Consumer implementation Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/9a21c070 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/9a21c070 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/9a21c070 Branch: refs/heads/master Commit: 9a21c07007d8565c1d58b56cf07d9994e2325b1c Parents: 2dfef38 Author: Andrea Cosentino <anco...@gmail.com> Authored: Wed Aug 23 12:09:44 2017 +0200 Committer: Andrea Cosentino <anco...@gmail.com> Committed: Wed Aug 23 13:20:35 2017 +0200 ---------------------------------------------------------------------- .../main/docs/hazelcast-instance-component.adoc | 5 +- .../src/main/docs/hazelcast-list-component.adoc | 5 +- .../src/main/docs/hazelcast-map-component.adoc | 5 +- .../main/docs/hazelcast-multimap-component.adoc | 5 +- .../main/docs/hazelcast-queue-component.adoc | 5 +- .../docs/hazelcast-replicatedmap-component.adoc | 5 +- .../src/main/docs/hazelcast-seda-component.adoc | 5 +- .../src/main/docs/hazelcast-set-component.adoc | 5 +- .../main/docs/hazelcast-topic-component.adoc | 5 +- .../component/hazelcast/HazelcastComponent.java | 5 +- .../hazelcast/HazelcastDefaultEndpoint.java | 3 + .../queue/HazelcastQueueComponent.java | 5 +- .../queue/HazelcastQueueConfiguration.java | 68 ++++++++++++++++ .../hazelcast/queue/HazelcastQueueConsumer.java | 81 +++++++++++++++++-- .../queue/HazelcastQueueConsumerMode.java | 48 +++++++++++ .../hazelcast/queue/HazelcastQueueEndpoint.java | 14 +++- .../HazelcastQueueConsumerPollTest.java | 85 ++++++++++++++++++++ 17 files changed, 335 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/9a21c070/components/camel-hazelcast/src/main/docs/hazelcast-instance-component.adoc ---------------------------------------------------------------------- diff --git a/components/camel-hazelcast/src/main/docs/hazelcast-instance-component.adoc b/components/camel-hazelcast/src/main/docs/hazelcast-instance-component.adoc index 34fb8a8..4c02dbe 100644 --- a/components/camel-hazelcast/src/main/docs/hazelcast-instance-component.adoc +++ b/components/camel-hazelcast/src/main/docs/hazelcast-instance-component.adoc @@ -38,7 +38,7 @@ with the following path and query parameters: | **cacheName** | *Required* The name of the cache | | String |======================================================================= -#### Query Parameters (13 parameters): +#### Query Parameters (16 parameters): [width="100%",cols="2,5,^1,2",options="header"] |======================================================================= @@ -48,6 +48,9 @@ with the following path and query parameters: | **defaultOperation** (consumer) | To specify a default operation to use if no operation header has been provided. | | HazelcastOperation | **hazelcastInstance** (consumer) | The hazelcast instance reference which can be used for hazelcast endpoint. | | HazelcastInstance | **hazelcastInstanceName** (consumer) | The hazelcast instance reference name which can be used for hazelcast endpoint. If you don't specify the instance reference camel use the default hazelcast instance from the camel-hazelcast instance. | | String +| **pollingTimeout** (consumer) | Define the polling timeout of the consumer in Poll mode | 10000 | long +| **poolSize** (consumer) | Define the Pool size for executor | 1 | int +| **queueConsumerMode** (consumer) | Define the Consumer mode: Listen or Poll | Listen | HazelcastQueueConsumer Mode | **exceptionHandler** (consumer) | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this options is not in use. By default the consumer will deal with exceptions that will be logged at WARN or ERROR level and ignored. | | ExceptionHandler | **exchangePattern** (consumer) | Sets the exchange pattern when the consumer creates an exchange. | | ExchangePattern | **synchronous** (advanced) | Sets whether synchronous processing should be strictly used or Camel is allowed to use asynchronous processing (if supported). | false | boolean http://git-wip-us.apache.org/repos/asf/camel/blob/9a21c070/components/camel-hazelcast/src/main/docs/hazelcast-list-component.adoc ---------------------------------------------------------------------- diff --git a/components/camel-hazelcast/src/main/docs/hazelcast-list-component.adoc b/components/camel-hazelcast/src/main/docs/hazelcast-list-component.adoc index 97e1008..4643bf4 100644 --- a/components/camel-hazelcast/src/main/docs/hazelcast-list-component.adoc +++ b/components/camel-hazelcast/src/main/docs/hazelcast-list-component.adoc @@ -34,7 +34,7 @@ with the following path and query parameters: | **cacheName** | *Required* The name of the cache | | String |======================================================================= -#### Query Parameters (13 parameters): +#### Query Parameters (16 parameters): [width="100%",cols="2,5,^1,2",options="header"] |======================================================================= @@ -44,6 +44,9 @@ with the following path and query parameters: | **hazelcastInstanceName** (common) | The hazelcast instance reference name which can be used for hazelcast endpoint. If you don't specify the instance reference camel use the default hazelcast instance from the camel-hazelcast instance. | | String | **reliable** (common) | Define if the endpoint will use a reliable Topic struct or not. | false | boolean | **bridgeErrorHandler** (consumer) | Allows for bridging the consumer to the Camel routing Error Handler which mean any exceptions occurred while the consumer is trying to pickup incoming messages or the likes will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions that will be logged at WARN or ERROR level and ignored. | false | boolean +| **pollingTimeout** (consumer) | Define the polling timeout of the consumer in Poll mode | 10000 | long +| **poolSize** (consumer) | Define the Pool size for executor | 1 | int +| **queueConsumerMode** (consumer) | Define the Consumer mode: Listen or Poll | Listen | HazelcastQueueConsumer Mode | **exceptionHandler** (consumer) | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this options is not in use. By default the consumer will deal with exceptions that will be logged at WARN or ERROR level and ignored. | | ExceptionHandler | **exchangePattern** (consumer) | Sets the exchange pattern when the consumer creates an exchange. | | ExchangePattern | **synchronous** (advanced) | Sets whether synchronous processing should be strictly used or Camel is allowed to use asynchronous processing (if supported). | false | boolean http://git-wip-us.apache.org/repos/asf/camel/blob/9a21c070/components/camel-hazelcast/src/main/docs/hazelcast-map-component.adoc ---------------------------------------------------------------------- diff --git a/components/camel-hazelcast/src/main/docs/hazelcast-map-component.adoc b/components/camel-hazelcast/src/main/docs/hazelcast-map-component.adoc index 6d2472b..cdbb9da 100644 --- a/components/camel-hazelcast/src/main/docs/hazelcast-map-component.adoc +++ b/components/camel-hazelcast/src/main/docs/hazelcast-map-component.adoc @@ -35,7 +35,7 @@ with the following path and query parameters: | **cacheName** | *Required* The name of the cache | | String |======================================================================= -#### Query Parameters (13 parameters): +#### Query Parameters (16 parameters): [width="100%",cols="2,5,^1,2",options="header"] |======================================================================= @@ -45,6 +45,9 @@ with the following path and query parameters: | **hazelcastInstanceName** (common) | The hazelcast instance reference name which can be used for hazelcast endpoint. If you don't specify the instance reference camel use the default hazelcast instance from the camel-hazelcast instance. | | String | **reliable** (common) | Define if the endpoint will use a reliable Topic struct or not. | false | boolean | **bridgeErrorHandler** (consumer) | Allows for bridging the consumer to the Camel routing Error Handler which mean any exceptions occurred while the consumer is trying to pickup incoming messages or the likes will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions that will be logged at WARN or ERROR level and ignored. | false | boolean +| **pollingTimeout** (consumer) | Define the polling timeout of the consumer in Poll mode | 10000 | long +| **poolSize** (consumer) | Define the Pool size for executor | 1 | int +| **queueConsumerMode** (consumer) | Define the Consumer mode: Listen or Poll | Listen | HazelcastQueueConsumer Mode | **exceptionHandler** (consumer) | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this options is not in use. By default the consumer will deal with exceptions that will be logged at WARN or ERROR level and ignored. | | ExceptionHandler | **exchangePattern** (consumer) | Sets the exchange pattern when the consumer creates an exchange. | | ExchangePattern | **synchronous** (advanced) | Sets whether synchronous processing should be strictly used or Camel is allowed to use asynchronous processing (if supported). | false | boolean http://git-wip-us.apache.org/repos/asf/camel/blob/9a21c070/components/camel-hazelcast/src/main/docs/hazelcast-multimap-component.adoc ---------------------------------------------------------------------- diff --git a/components/camel-hazelcast/src/main/docs/hazelcast-multimap-component.adoc b/components/camel-hazelcast/src/main/docs/hazelcast-multimap-component.adoc index fc7281a..4516d7a 100644 --- a/components/camel-hazelcast/src/main/docs/hazelcast-multimap-component.adoc +++ b/components/camel-hazelcast/src/main/docs/hazelcast-multimap-component.adoc @@ -36,7 +36,7 @@ with the following path and query parameters: | **cacheName** | *Required* The name of the cache | | String |======================================================================= -#### Query Parameters (13 parameters): +#### Query Parameters (16 parameters): [width="100%",cols="2,5,^1,2",options="header"] |======================================================================= @@ -46,6 +46,9 @@ with the following path and query parameters: | **hazelcastInstanceName** (common) | The hazelcast instance reference name which can be used for hazelcast endpoint. If you don't specify the instance reference camel use the default hazelcast instance from the camel-hazelcast instance. | | String | **reliable** (common) | Define if the endpoint will use a reliable Topic struct or not. | false | boolean | **bridgeErrorHandler** (consumer) | Allows for bridging the consumer to the Camel routing Error Handler which mean any exceptions occurred while the consumer is trying to pickup incoming messages or the likes will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions that will be logged at WARN or ERROR level and ignored. | false | boolean +| **pollingTimeout** (consumer) | Define the polling timeout of the consumer in Poll mode | 10000 | long +| **poolSize** (consumer) | Define the Pool size for executor | 1 | int +| **queueConsumerMode** (consumer) | Define the Consumer mode: Listen or Poll | Listen | HazelcastQueueConsumer Mode | **exceptionHandler** (consumer) | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this options is not in use. By default the consumer will deal with exceptions that will be logged at WARN or ERROR level and ignored. | | ExceptionHandler | **exchangePattern** (consumer) | Sets the exchange pattern when the consumer creates an exchange. | | ExchangePattern | **synchronous** (advanced) | Sets whether synchronous processing should be strictly used or Camel is allowed to use asynchronous processing (if supported). | false | boolean http://git-wip-us.apache.org/repos/asf/camel/blob/9a21c070/components/camel-hazelcast/src/main/docs/hazelcast-queue-component.adoc ---------------------------------------------------------------------- diff --git a/components/camel-hazelcast/src/main/docs/hazelcast-queue-component.adoc b/components/camel-hazelcast/src/main/docs/hazelcast-queue-component.adoc index 20a3bb4..a7d6aa4 100644 --- a/components/camel-hazelcast/src/main/docs/hazelcast-queue-component.adoc +++ b/components/camel-hazelcast/src/main/docs/hazelcast-queue-component.adoc @@ -35,7 +35,7 @@ with the following path and query parameters: | **cacheName** | *Required* The name of the cache | | String |======================================================================= -#### Query Parameters (13 parameters): +#### Query Parameters (16 parameters): [width="100%",cols="2,5,^1,2",options="header"] |======================================================================= @@ -45,6 +45,9 @@ with the following path and query parameters: | **hazelcastInstanceName** (common) | The hazelcast instance reference name which can be used for hazelcast endpoint. If you don't specify the instance reference camel use the default hazelcast instance from the camel-hazelcast instance. | | String | **reliable** (common) | Define if the endpoint will use a reliable Topic struct or not. | false | boolean | **bridgeErrorHandler** (consumer) | Allows for bridging the consumer to the Camel routing Error Handler which mean any exceptions occurred while the consumer is trying to pickup incoming messages or the likes will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions that will be logged at WARN or ERROR level and ignored. | false | boolean +| **pollingTimeout** (consumer) | Define the polling timeout of the consumer in Poll mode | 10000 | long +| **poolSize** (consumer) | Define the Pool size for executor | 1 | int +| **queueConsumerMode** (consumer) | Define the Consumer mode: Listen or Poll | Listen | HazelcastQueueConsumer Mode | **exceptionHandler** (consumer) | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this options is not in use. By default the consumer will deal with exceptions that will be logged at WARN or ERROR level and ignored. | | ExceptionHandler | **exchangePattern** (consumer) | Sets the exchange pattern when the consumer creates an exchange. | | ExchangePattern | **synchronous** (advanced) | Sets whether synchronous processing should be strictly used or Camel is allowed to use asynchronous processing (if supported). | false | boolean http://git-wip-us.apache.org/repos/asf/camel/blob/9a21c070/components/camel-hazelcast/src/main/docs/hazelcast-replicatedmap-component.adoc ---------------------------------------------------------------------- diff --git a/components/camel-hazelcast/src/main/docs/hazelcast-replicatedmap-component.adoc b/components/camel-hazelcast/src/main/docs/hazelcast-replicatedmap-component.adoc index 29fa092..43faf25 100644 --- a/components/camel-hazelcast/src/main/docs/hazelcast-replicatedmap-component.adoc +++ b/components/camel-hazelcast/src/main/docs/hazelcast-replicatedmap-component.adoc @@ -36,7 +36,7 @@ with the following path and query parameters: | **cacheName** | *Required* The name of the cache | | String |======================================================================= -#### Query Parameters (13 parameters): +#### Query Parameters (16 parameters): [width="100%",cols="2,5,^1,2",options="header"] |======================================================================= @@ -46,6 +46,9 @@ with the following path and query parameters: | **hazelcastInstanceName** (common) | The hazelcast instance reference name which can be used for hazelcast endpoint. If you don't specify the instance reference camel use the default hazelcast instance from the camel-hazelcast instance. | | String | **reliable** (common) | Define if the endpoint will use a reliable Topic struct or not. | false | boolean | **bridgeErrorHandler** (consumer) | Allows for bridging the consumer to the Camel routing Error Handler which mean any exceptions occurred while the consumer is trying to pickup incoming messages or the likes will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions that will be logged at WARN or ERROR level and ignored. | false | boolean +| **pollingTimeout** (consumer) | Define the polling timeout of the consumer in Poll mode | 10000 | long +| **poolSize** (consumer) | Define the Pool size for executor | 1 | int +| **queueConsumerMode** (consumer) | Define the Consumer mode: Listen or Poll | Listen | HazelcastQueueConsumer Mode | **exceptionHandler** (consumer) | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this options is not in use. By default the consumer will deal with exceptions that will be logged at WARN or ERROR level and ignored. | | ExceptionHandler | **exchangePattern** (consumer) | Sets the exchange pattern when the consumer creates an exchange. | | ExchangePattern | **synchronous** (advanced) | Sets whether synchronous processing should be strictly used or Camel is allowed to use asynchronous processing (if supported). | false | boolean http://git-wip-us.apache.org/repos/asf/camel/blob/9a21c070/components/camel-hazelcast/src/main/docs/hazelcast-seda-component.adoc ---------------------------------------------------------------------- diff --git a/components/camel-hazelcast/src/main/docs/hazelcast-seda-component.adoc b/components/camel-hazelcast/src/main/docs/hazelcast-seda-component.adoc index 6961dce..00e1bb7 100644 --- a/components/camel-hazelcast/src/main/docs/hazelcast-seda-component.adoc +++ b/components/camel-hazelcast/src/main/docs/hazelcast-seda-component.adoc @@ -36,7 +36,7 @@ with the following path and query parameters: | **cacheName** | *Required* The name of the cache | | String |======================================================================= -#### Query Parameters (13 parameters): +#### Query Parameters (16 parameters): [width="100%",cols="2,5,^1,2",options="header"] |======================================================================= @@ -46,6 +46,9 @@ with the following path and query parameters: | **hazelcastInstanceName** (common) | The hazelcast instance reference name which can be used for hazelcast endpoint. If you don't specify the instance reference camel use the default hazelcast instance from the camel-hazelcast instance. | | String | **reliable** (common) | Define if the endpoint will use a reliable Topic struct or not. | false | boolean | **bridgeErrorHandler** (consumer) | Allows for bridging the consumer to the Camel routing Error Handler which mean any exceptions occurred while the consumer is trying to pickup incoming messages or the likes will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions that will be logged at WARN or ERROR level and ignored. | false | boolean +| **pollingTimeout** (consumer) | Define the polling timeout of the consumer in Poll mode | 10000 | long +| **poolSize** (consumer) | Define the Pool size for executor | 1 | int +| **queueConsumerMode** (consumer) | Define the Consumer mode: Listen or Poll | Listen | HazelcastQueueConsumer Mode | **exceptionHandler** (consumer) | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this options is not in use. By default the consumer will deal with exceptions that will be logged at WARN or ERROR level and ignored. | | ExceptionHandler | **exchangePattern** (consumer) | Sets the exchange pattern when the consumer creates an exchange. | | ExchangePattern | **synchronous** (advanced) | Sets whether synchronous processing should be strictly used or Camel is allowed to use asynchronous processing (if supported). | false | boolean http://git-wip-us.apache.org/repos/asf/camel/blob/9a21c070/components/camel-hazelcast/src/main/docs/hazelcast-set-component.adoc ---------------------------------------------------------------------- diff --git a/components/camel-hazelcast/src/main/docs/hazelcast-set-component.adoc b/components/camel-hazelcast/src/main/docs/hazelcast-set-component.adoc index c6e2d56..3135752 100644 --- a/components/camel-hazelcast/src/main/docs/hazelcast-set-component.adoc +++ b/components/camel-hazelcast/src/main/docs/hazelcast-set-component.adoc @@ -35,7 +35,7 @@ with the following path and query parameters: | **cacheName** | *Required* The name of the cache | | String |======================================================================= -#### Query Parameters (13 parameters): +#### Query Parameters (16 parameters): [width="100%",cols="2,5,^1,2",options="header"] |======================================================================= @@ -45,6 +45,9 @@ with the following path and query parameters: | **hazelcastInstanceName** (common) | The hazelcast instance reference name which can be used for hazelcast endpoint. If you don't specify the instance reference camel use the default hazelcast instance from the camel-hazelcast instance. | | String | **reliable** (common) | Define if the endpoint will use a reliable Topic struct or not. | false | boolean | **bridgeErrorHandler** (consumer) | Allows for bridging the consumer to the Camel routing Error Handler which mean any exceptions occurred while the consumer is trying to pickup incoming messages or the likes will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions that will be logged at WARN or ERROR level and ignored. | false | boolean +| **pollingTimeout** (consumer) | Define the polling timeout of the consumer in Poll mode | 10000 | long +| **poolSize** (consumer) | Define the Pool size for executor | 1 | int +| **queueConsumerMode** (consumer) | Define the Consumer mode: Listen or Poll | Listen | HazelcastQueueConsumer Mode | **exceptionHandler** (consumer) | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this options is not in use. By default the consumer will deal with exceptions that will be logged at WARN or ERROR level and ignored. | | ExceptionHandler | **exchangePattern** (consumer) | Sets the exchange pattern when the consumer creates an exchange. | | ExchangePattern | **synchronous** (advanced) | Sets whether synchronous processing should be strictly used or Camel is allowed to use asynchronous processing (if supported). | false | boolean http://git-wip-us.apache.org/repos/asf/camel/blob/9a21c070/components/camel-hazelcast/src/main/docs/hazelcast-topic-component.adoc ---------------------------------------------------------------------- diff --git a/components/camel-hazelcast/src/main/docs/hazelcast-topic-component.adoc b/components/camel-hazelcast/src/main/docs/hazelcast-topic-component.adoc index 9af9ae5..542aa3e 100644 --- a/components/camel-hazelcast/src/main/docs/hazelcast-topic-component.adoc +++ b/components/camel-hazelcast/src/main/docs/hazelcast-topic-component.adoc @@ -35,7 +35,7 @@ with the following path and query parameters: | **cacheName** | *Required* The name of the cache | | String |======================================================================= -#### Query Parameters (13 parameters): +#### Query Parameters (16 parameters): [width="100%",cols="2,5,^1,2",options="header"] |======================================================================= @@ -45,6 +45,9 @@ with the following path and query parameters: | **hazelcastInstanceName** (common) | The hazelcast instance reference name which can be used for hazelcast endpoint. If you don't specify the instance reference camel use the default hazelcast instance from the camel-hazelcast instance. | | String | **reliable** (common) | Define if the endpoint will use a reliable Topic struct or not. | false | boolean | **bridgeErrorHandler** (consumer) | Allows for bridging the consumer to the Camel routing Error Handler which mean any exceptions occurred while the consumer is trying to pickup incoming messages or the likes will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions that will be logged at WARN or ERROR level and ignored. | false | boolean +| **pollingTimeout** (consumer) | Define the polling timeout of the consumer in Poll mode | 10000 | long +| **poolSize** (consumer) | Define the Pool size for executor | 1 | int +| **queueConsumerMode** (consumer) | Define the Consumer mode: Listen or Poll | Listen | HazelcastQueueConsumer Mode | **exceptionHandler** (consumer) | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this options is not in use. By default the consumer will deal with exceptions that will be logged at WARN or ERROR level and ignored. | | ExceptionHandler | **exchangePattern** (consumer) | Sets the exchange pattern when the consumer creates an exchange. | | ExchangePattern | **synchronous** (advanced) | Sets whether synchronous processing should be strictly used or Camel is allowed to use asynchronous processing (if supported). | false | boolean http://git-wip-us.apache.org/repos/asf/camel/blob/9a21c070/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastComponent.java b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastComponent.java index 55f8d0e..4d270db 100644 --- a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastComponent.java +++ b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastComponent.java @@ -35,6 +35,7 @@ import org.apache.camel.component.hazelcast.instance.HazelcastInstanceEndpoint; import org.apache.camel.component.hazelcast.list.HazelcastListEndpoint; import org.apache.camel.component.hazelcast.map.HazelcastMapEndpoint; import org.apache.camel.component.hazelcast.multimap.HazelcastMultimapEndpoint; +import org.apache.camel.component.hazelcast.queue.HazelcastQueueConfiguration; import org.apache.camel.component.hazelcast.queue.HazelcastQueueEndpoint; import org.apache.camel.component.hazelcast.replicatedmap.HazelcastReplicatedmapEndpoint; import org.apache.camel.component.hazelcast.ringbuffer.HazelcastRingbufferEndpoint; @@ -112,7 +113,9 @@ public class HazelcastComponent extends HazelcastDefaultComponent { if (remaining.startsWith(HazelcastConstants.QUEUE_PREFIX)) { // remaining is anything (name it foo ;) remaining = StringHelper.removeStartingCharacters(remaining.substring(HazelcastConstants.QUEUE_PREFIX.length()), '/'); - endpoint = new HazelcastQueueEndpoint(hzInstance, uri, this, remaining); + final HazelcastQueueConfiguration config = new HazelcastQueueConfiguration(); + setProperties(config, parameters); + endpoint = new HazelcastQueueEndpoint(hzInstance, uri, this, remaining, config); endpoint.setCommand(HazelcastCommand.queue); } http://git-wip-us.apache.org/repos/asf/camel/blob/9a21c070/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastDefaultEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastDefaultEndpoint.java b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastDefaultEndpoint.java index f97ed3f..995c34c 100644 --- a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastDefaultEndpoint.java +++ b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastDefaultEndpoint.java @@ -21,6 +21,7 @@ import org.apache.camel.Component; import org.apache.camel.Consumer; import org.apache.camel.Processor; import org.apache.camel.Producer; +import org.apache.camel.component.hazelcast.queue.HazelcastQueueConfiguration; import org.apache.camel.component.hazelcast.seda.HazelcastSedaConfiguration; import org.apache.camel.component.hazelcast.topic.HazelcastTopicConfiguration; import org.apache.camel.impl.DefaultEndpoint; @@ -46,6 +47,8 @@ public abstract class HazelcastDefaultEndpoint extends DefaultEndpoint { private HazelcastSedaConfiguration hazelcastSedaConfiguration; // to include component schema docs @UriParam private HazelcastTopicConfiguration hazelcastTopicConfiguration; + @UriParam + private HazelcastQueueConfiguration hazelcastQueueConfiguration; public HazelcastDefaultEndpoint(HazelcastInstance hazelcastInstance, String endpointUri, Component component) { this(hazelcastInstance, endpointUri, component, null); http://git-wip-us.apache.org/repos/asf/camel/blob/9a21c070/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueComponent.java b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueComponent.java index ba8bc75..53f2ef6 100644 --- a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueComponent.java +++ b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueComponent.java @@ -36,7 +36,10 @@ public class HazelcastQueueComponent extends HazelcastDefaultComponent { @Override protected HazelcastDefaultEndpoint doCreateEndpoint(String uri, String remaining, Map<String, Object> parameters, HazelcastInstance hzInstance) throws Exception { - return new HazelcastQueueEndpoint(hzInstance, uri, this, remaining); + final HazelcastQueueConfiguration config = new HazelcastQueueConfiguration(); + setProperties(config, parameters); + HazelcastQueueEndpoint answer = new HazelcastQueueEndpoint(hzInstance, uri, this, remaining, config); + return answer; } } http://git-wip-us.apache.org/repos/asf/camel/blob/9a21c070/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueConfiguration.java b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueConfiguration.java new file mode 100644 index 0000000..fe59479 --- /dev/null +++ b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueConfiguration.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.hazelcast.queue; + +import org.apache.camel.spi.UriParam; +import org.apache.camel.spi.UriParams; + +/** + * Hazelcast Queue Component configuration. + */ +@UriParams +public class HazelcastQueueConfiguration { + + @UriParam(label = "consumer", defaultValue = "10000") + private long pollingTimeout = 10000L; + @UriParam(label = "consumer", defaultValue = "Listen") + private HazelcastQueueConsumerMode queueConsumerMode = HazelcastQueueConsumerMode.LISTEN; + @UriParam(label = "consumer", defaultValue = "1") + private int poolSize = 1; + + + /** + * Define the polling timeout of the Queue consumer in Poll mode + */ + public long getPollingTimeout() { + return pollingTimeout; + } + + public void setPollingTimeout(long pollingTimeout) { + this.pollingTimeout = pollingTimeout; + } + + /** + * Define the Queue Consumer mode: Listen or Poll + */ + public HazelcastQueueConsumerMode getQueueConsumerMode() { + return queueConsumerMode; + } + + public void setQueueConsumerMode(HazelcastQueueConsumerMode queueConsumerMode) { + this.queueConsumerMode = queueConsumerMode; + } + + /** + * Define the Pool size for Queue Consumer Executor + */ + public int getPoolSize() { + return poolSize; + } + + public void setPoolSize(int poolSize) { + this.poolSize = poolSize; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/9a21c070/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueConsumer.java b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueConsumer.java index 65e1111..8c6fab5 100644 --- a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueConsumer.java +++ b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueConsumer.java @@ -16,24 +16,93 @@ */ package org.apache.camel.component.hazelcast.queue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + import com.hazelcast.core.HazelcastInstance; import com.hazelcast.core.IQueue; import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.component.hazelcast.HazelcastDefaultConsumer; import org.apache.camel.component.hazelcast.listener.CamelItemListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -/** - * - */ public class HazelcastQueueConsumer extends HazelcastDefaultConsumer { - public HazelcastQueueConsumer(HazelcastInstance hazelcastInstance, Endpoint endpoint, Processor processor, String cacheName) { + private static final Logger LOG = LoggerFactory.getLogger(HazelcastQueueConsumer.class); + private final Processor processor; + private ExecutorService executor; + private QueueConsumerTask queueConsumerTask; + private HazelcastQueueConfiguration config; + + public HazelcastQueueConsumer(HazelcastInstance hazelcastInstance, Endpoint endpoint, Processor processor, String cacheName, final HazelcastQueueConfiguration configuration) { super(hazelcastInstance, endpoint, processor, cacheName); + this.processor = processor; + this.config = configuration; + } + + @Override + protected void doStart() throws Exception { + super.doStart(); + executor = ((HazelcastQueueEndpoint)getEndpoint()).createExecutor(); + + CamelItemListener camelItemListener = new CamelItemListener(this, cacheName); + queueConsumerTask = new QueueConsumerTask(camelItemListener); + executor.submit(queueConsumerTask); + } + + @Override + protected void doStop() throws Exception { + super.doStop(); + + if (executor != null) { + if (getEndpoint() != null && getEndpoint().getCamelContext() != null) { + getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(executor); + } else { + executor.shutdownNow(); + } + } + executor = null; + } + + class QueueConsumerTask implements Runnable { + + CamelItemListener camelItemListener; + + public QueueConsumerTask(CamelItemListener camelItemListener) { + this.camelItemListener = camelItemListener; + } + + @Override + public void run() { + IQueue<Object> queue = hazelcastInstance.getQueue(cacheName); + if (config.getQueueConsumerMode() == HazelcastQueueConsumerMode.LISTEN) { + queue.addItemListener(camelItemListener, true); + } - IQueue<Object> queue = hazelcastInstance.getQueue(cacheName); - queue.addItemListener(new CamelItemListener(this, cacheName), true); + if (config.getQueueConsumerMode() == HazelcastQueueConsumerMode.POLL) { + while (isRunAllowed()) { + try { + final Object body = queue.poll(config.getPollingTimeout(), TimeUnit.MILLISECONDS); + Exchange exchange = getEndpoint().createExchange(); + exchange.getOut().setBody(body); + try { + processor.process(exchange); + } catch (Exception e) { + getExceptionHandler().handleException("Error during processing", exchange, e); + } + } catch (InterruptedException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("Hazelcast Queue Consumer Interrupted: " + e, e); + continue; + } + } + } + } + } } } http://git-wip-us.apache.org/repos/asf/camel/blob/9a21c070/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueConsumerMode.java ---------------------------------------------------------------------- diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueConsumerMode.java b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueConsumerMode.java new file mode 100644 index 0000000..01c070d --- /dev/null +++ b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueConsumerMode.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.hazelcast.queue; + +public enum HazelcastQueueConsumerMode { + + LISTEN("listen"), + POLL("poll"); + + + private static HazelcastQueueConsumerMode[] values = values(); + private final String mode; + + HazelcastQueueConsumerMode(String mode) { + this.mode = mode; + } + + public static HazelcastQueueConsumerMode getHazelcastOperation(String name) { + if (name == null) { + return null; + } + for (HazelcastQueueConsumerMode hazelcastQueueConsumerMode : values) { + if (hazelcastQueueConsumerMode.toString().equalsIgnoreCase(name) || hazelcastQueueConsumerMode.name().equalsIgnoreCase(name)) { + return hazelcastQueueConsumerMode; + } + } + throw new IllegalArgumentException(String.format("Mode '%s' is not supported by this component.", name)); + } + + public String toString() { + return mode; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/9a21c070/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueEndpoint.java b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueEndpoint.java index a484577..64568c3 100644 --- a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueEndpoint.java +++ b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueEndpoint.java @@ -16,7 +16,10 @@ */ package org.apache.camel.component.hazelcast.queue; +import java.util.concurrent.ExecutorService; + import com.hazelcast.core.HazelcastInstance; + import org.apache.camel.Component; import org.apache.camel.Consumer; import org.apache.camel.Processor; @@ -31,16 +34,19 @@ import org.apache.camel.spi.UriEndpoint; */ @UriEndpoint(firstVersion = "2.7.0", scheme = "hazelcast-queue", title = "Hazelcast Queue", syntax = "hazelcast-queue:cacheName", label = "cache,datagrid") public class HazelcastQueueEndpoint extends HazelcastDefaultEndpoint { + + private final HazelcastQueueConfiguration configuration; - public HazelcastQueueEndpoint(HazelcastInstance hazelcastInstance, String endpointUri, Component component, String cacheName) { + public HazelcastQueueEndpoint(HazelcastInstance hazelcastInstance, String endpointUri, Component component, String cacheName, final HazelcastQueueConfiguration configuration) { super(hazelcastInstance, endpointUri, component, cacheName); + this.configuration = configuration; setCommand(HazelcastCommand.queue); setDefaultOperation(HazelcastOperation.ADD); } @Override public Consumer createConsumer(Processor processor) throws Exception { - HazelcastQueueConsumer answer = new HazelcastQueueConsumer(hazelcastInstance, this, processor, cacheName); + HazelcastQueueConsumer answer = new HazelcastQueueConsumer(hazelcastInstance, this, processor, cacheName, configuration); configureConsumer(answer); return answer; } @@ -49,5 +55,9 @@ public class HazelcastQueueEndpoint extends HazelcastDefaultEndpoint { public Producer createProducer() throws Exception { return new HazelcastQueueProducer(hazelcastInstance, this, cacheName); } + + public ExecutorService createExecutor() { + return getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, "QueueConsumer", configuration.getPoolSize()); + } } http://git-wip-us.apache.org/repos/asf/camel/blob/9a21c070/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastQueueConsumerPollTest.java ---------------------------------------------------------------------- diff --git a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastQueueConsumerPollTest.java b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastQueueConsumerPollTest.java new file mode 100644 index 0000000..a6d7524 --- /dev/null +++ b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastQueueConsumerPollTest.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.hazelcast; + +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import com.hazelcast.core.HazelcastInstance; +import com.hazelcast.core.IQueue; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.junit.Test; +import org.mockito.Mock; + +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.atLeast; + +public class HazelcastQueueConsumerPollTest extends HazelcastCamelTestSupport { + + @Mock + private IQueue<String> queue; + + @Override + @SuppressWarnings("unchecked") + protected void trainHazelcastInstance(HazelcastInstance hazelcastInstance) { + when(hazelcastInstance.<String>getQueue("foo")).thenReturn(queue); + } + + @Override + @SuppressWarnings("unchecked") + protected void verifyHazelcastInstance(HazelcastInstance hazelcastInstance) { + verify(hazelcastInstance).getQueue("foo"); + try { + verify(queue, atLeast(1)).poll(10000, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + @Test + @SuppressWarnings("unchecked") + public void add() throws InterruptedException { + when(queue.poll(10000, TimeUnit.MILLISECONDS)).thenReturn("foo"); + + MockEndpoint out = getMockEndpoint("mock:result"); + out.expectedMessageCount(1); + + + assertMockEndpointsSatisfied(2000, TimeUnit.MILLISECONDS); + + this.checkHeadersAbsence(out.getExchanges().get(0).getIn().getHeaders(), HazelcastConstants.ADDED); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from(String.format("hazelcast-%sfoo?queueConsumerMode=Poll", HazelcastConstants.QUEUE_PREFIX)).to("mock:result"); + } + }; + } + + private void checkHeadersAbsence(Map<String, Object> headers, String action) { + assertNotEquals(action, headers.get(HazelcastConstants.LISTENER_ACTION)); + assertNotEquals(HazelcastConstants.CACHE_LISTENER, headers.get(HazelcastConstants.LISTENER_TYPE)); + assertNull(headers.get(HazelcastConstants.LISTENER_TIME)); + } + +}