Repository: camel
Updated Branches:
  refs/heads/master 2dfef38db -> 6e936bb7b


CAMEL-11666 - Camel Hazelcast Queue Consumer implementation


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/9a21c070
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/9a21c070
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/9a21c070

Branch: refs/heads/master
Commit: 9a21c07007d8565c1d58b56cf07d9994e2325b1c
Parents: 2dfef38
Author: Andrea Cosentino <anco...@gmail.com>
Authored: Wed Aug 23 12:09:44 2017 +0200
Committer: Andrea Cosentino <anco...@gmail.com>
Committed: Wed Aug 23 13:20:35 2017 +0200

----------------------------------------------------------------------
 .../main/docs/hazelcast-instance-component.adoc |  5 +-
 .../src/main/docs/hazelcast-list-component.adoc |  5 +-
 .../src/main/docs/hazelcast-map-component.adoc  |  5 +-
 .../main/docs/hazelcast-multimap-component.adoc |  5 +-
 .../main/docs/hazelcast-queue-component.adoc    |  5 +-
 .../docs/hazelcast-replicatedmap-component.adoc |  5 +-
 .../src/main/docs/hazelcast-seda-component.adoc |  5 +-
 .../src/main/docs/hazelcast-set-component.adoc  |  5 +-
 .../main/docs/hazelcast-topic-component.adoc    |  5 +-
 .../component/hazelcast/HazelcastComponent.java |  5 +-
 .../hazelcast/HazelcastDefaultEndpoint.java     |  3 +
 .../queue/HazelcastQueueComponent.java          |  5 +-
 .../queue/HazelcastQueueConfiguration.java      | 68 ++++++++++++++++
 .../hazelcast/queue/HazelcastQueueConsumer.java | 81 +++++++++++++++++--
 .../queue/HazelcastQueueConsumerMode.java       | 48 +++++++++++
 .../hazelcast/queue/HazelcastQueueEndpoint.java | 14 +++-
 .../HazelcastQueueConsumerPollTest.java         | 85 ++++++++++++++++++++
 17 files changed, 335 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/9a21c070/components/camel-hazelcast/src/main/docs/hazelcast-instance-component.adoc
----------------------------------------------------------------------
diff --git 
a/components/camel-hazelcast/src/main/docs/hazelcast-instance-component.adoc 
b/components/camel-hazelcast/src/main/docs/hazelcast-instance-component.adoc
index 34fb8a8..4c02dbe 100644
--- a/components/camel-hazelcast/src/main/docs/hazelcast-instance-component.adoc
+++ b/components/camel-hazelcast/src/main/docs/hazelcast-instance-component.adoc
@@ -38,7 +38,7 @@ with the following path and query parameters:
 | **cacheName** | *Required* The name of the cache |  | String
 |=======================================================================
 
-#### Query Parameters (13 parameters):
+#### Query Parameters (16 parameters):
 
 [width="100%",cols="2,5,^1,2",options="header"]
 |=======================================================================
@@ -48,6 +48,9 @@ with the following path and query parameters:
 | **defaultOperation** (consumer) | To specify a default operation to use if 
no operation header has been provided. |  | HazelcastOperation
 | **hazelcastInstance** (consumer) | The hazelcast instance reference which 
can be used for hazelcast endpoint. |  | HazelcastInstance
 | **hazelcastInstanceName** (consumer) | The hazelcast instance reference name 
which can be used for hazelcast endpoint. If you don't specify the instance 
reference camel use the default hazelcast instance from the camel-hazelcast 
instance. |  | String
+| **pollingTimeout** (consumer) | Define the polling timeout of the consumer 
in Poll mode | 10000 | long
+| **poolSize** (consumer) | Define the Pool size for executor | 1 | int
+| **queueConsumerMode** (consumer) | Define the Consumer mode: Listen or Poll 
| Listen | HazelcastQueueConsumer Mode
 | **exceptionHandler** (consumer) | To let the consumer use a custom 
ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this 
options is not in use. By default the consumer will deal with exceptions that 
will be logged at WARN or ERROR level and ignored. |  | ExceptionHandler
 | **exchangePattern** (consumer) | Sets the exchange pattern when the consumer 
creates an exchange. |  | ExchangePattern
 | **synchronous** (advanced) | Sets whether synchronous processing should be 
strictly used or Camel is allowed to use asynchronous processing (if 
supported). | false | boolean

http://git-wip-us.apache.org/repos/asf/camel/blob/9a21c070/components/camel-hazelcast/src/main/docs/hazelcast-list-component.adoc
----------------------------------------------------------------------
diff --git 
a/components/camel-hazelcast/src/main/docs/hazelcast-list-component.adoc 
b/components/camel-hazelcast/src/main/docs/hazelcast-list-component.adoc
index 97e1008..4643bf4 100644
--- a/components/camel-hazelcast/src/main/docs/hazelcast-list-component.adoc
+++ b/components/camel-hazelcast/src/main/docs/hazelcast-list-component.adoc
@@ -34,7 +34,7 @@ with the following path and query parameters:
 | **cacheName** | *Required* The name of the cache |  | String
 |=======================================================================
 
-#### Query Parameters (13 parameters):
+#### Query Parameters (16 parameters):
 
 [width="100%",cols="2,5,^1,2",options="header"]
 |=======================================================================
@@ -44,6 +44,9 @@ with the following path and query parameters:
 | **hazelcastInstanceName** (common) | The hazelcast instance reference name 
which can be used for hazelcast endpoint. If you don't specify the instance 
reference camel use the default hazelcast instance from the camel-hazelcast 
instance. |  | String
 | **reliable** (common) | Define if the endpoint will use a reliable Topic 
struct or not. | false | boolean
 | **bridgeErrorHandler** (consumer) | Allows for bridging the consumer to the 
Camel routing Error Handler which mean any exceptions occurred while the 
consumer is trying to pickup incoming messages or the likes will now be 
processed as a message and handled by the routing Error Handler. By default the 
consumer will use the org.apache.camel.spi.ExceptionHandler to deal with 
exceptions that will be logged at WARN or ERROR level and ignored. | false | 
boolean
+| **pollingTimeout** (consumer) | Define the polling timeout of the consumer 
in Poll mode | 10000 | long
+| **poolSize** (consumer) | Define the Pool size for executor | 1 | int
+| **queueConsumerMode** (consumer) | Define the Consumer mode: Listen or Poll 
| Listen | HazelcastQueueConsumer Mode
 | **exceptionHandler** (consumer) | To let the consumer use a custom 
ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this 
options is not in use. By default the consumer will deal with exceptions that 
will be logged at WARN or ERROR level and ignored. |  | ExceptionHandler
 | **exchangePattern** (consumer) | Sets the exchange pattern when the consumer 
creates an exchange. |  | ExchangePattern
 | **synchronous** (advanced) | Sets whether synchronous processing should be 
strictly used or Camel is allowed to use asynchronous processing (if 
supported). | false | boolean

http://git-wip-us.apache.org/repos/asf/camel/blob/9a21c070/components/camel-hazelcast/src/main/docs/hazelcast-map-component.adoc
----------------------------------------------------------------------
diff --git 
a/components/camel-hazelcast/src/main/docs/hazelcast-map-component.adoc 
b/components/camel-hazelcast/src/main/docs/hazelcast-map-component.adoc
index 6d2472b..cdbb9da 100644
--- a/components/camel-hazelcast/src/main/docs/hazelcast-map-component.adoc
+++ b/components/camel-hazelcast/src/main/docs/hazelcast-map-component.adoc
@@ -35,7 +35,7 @@ with the following path and query parameters:
 | **cacheName** | *Required* The name of the cache |  | String
 |=======================================================================
 
-#### Query Parameters (13 parameters):
+#### Query Parameters (16 parameters):
 
 [width="100%",cols="2,5,^1,2",options="header"]
 |=======================================================================
@@ -45,6 +45,9 @@ with the following path and query parameters:
 | **hazelcastInstanceName** (common) | The hazelcast instance reference name 
which can be used for hazelcast endpoint. If you don't specify the instance 
reference camel use the default hazelcast instance from the camel-hazelcast 
instance. |  | String
 | **reliable** (common) | Define if the endpoint will use a reliable Topic 
struct or not. | false | boolean
 | **bridgeErrorHandler** (consumer) | Allows for bridging the consumer to the 
Camel routing Error Handler which mean any exceptions occurred while the 
consumer is trying to pickup incoming messages or the likes will now be 
processed as a message and handled by the routing Error Handler. By default the 
consumer will use the org.apache.camel.spi.ExceptionHandler to deal with 
exceptions that will be logged at WARN or ERROR level and ignored. | false | 
boolean
+| **pollingTimeout** (consumer) | Define the polling timeout of the consumer 
in Poll mode | 10000 | long
+| **poolSize** (consumer) | Define the Pool size for executor | 1 | int
+| **queueConsumerMode** (consumer) | Define the Consumer mode: Listen or Poll 
| Listen | HazelcastQueueConsumer Mode
 | **exceptionHandler** (consumer) | To let the consumer use a custom 
ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this 
options is not in use. By default the consumer will deal with exceptions that 
will be logged at WARN or ERROR level and ignored. |  | ExceptionHandler
 | **exchangePattern** (consumer) | Sets the exchange pattern when the consumer 
creates an exchange. |  | ExchangePattern
 | **synchronous** (advanced) | Sets whether synchronous processing should be 
strictly used or Camel is allowed to use asynchronous processing (if 
supported). | false | boolean

http://git-wip-us.apache.org/repos/asf/camel/blob/9a21c070/components/camel-hazelcast/src/main/docs/hazelcast-multimap-component.adoc
----------------------------------------------------------------------
diff --git 
a/components/camel-hazelcast/src/main/docs/hazelcast-multimap-component.adoc 
b/components/camel-hazelcast/src/main/docs/hazelcast-multimap-component.adoc
index fc7281a..4516d7a 100644
--- a/components/camel-hazelcast/src/main/docs/hazelcast-multimap-component.adoc
+++ b/components/camel-hazelcast/src/main/docs/hazelcast-multimap-component.adoc
@@ -36,7 +36,7 @@ with the following path and query parameters:
 | **cacheName** | *Required* The name of the cache |  | String
 |=======================================================================
 
-#### Query Parameters (13 parameters):
+#### Query Parameters (16 parameters):
 
 [width="100%",cols="2,5,^1,2",options="header"]
 |=======================================================================
@@ -46,6 +46,9 @@ with the following path and query parameters:
 | **hazelcastInstanceName** (common) | The hazelcast instance reference name 
which can be used for hazelcast endpoint. If you don't specify the instance 
reference camel use the default hazelcast instance from the camel-hazelcast 
instance. |  | String
 | **reliable** (common) | Define if the endpoint will use a reliable Topic 
struct or not. | false | boolean
 | **bridgeErrorHandler** (consumer) | Allows for bridging the consumer to the 
Camel routing Error Handler which mean any exceptions occurred while the 
consumer is trying to pickup incoming messages or the likes will now be 
processed as a message and handled by the routing Error Handler. By default the 
consumer will use the org.apache.camel.spi.ExceptionHandler to deal with 
exceptions that will be logged at WARN or ERROR level and ignored. | false | 
boolean
+| **pollingTimeout** (consumer) | Define the polling timeout of the consumer 
in Poll mode | 10000 | long
+| **poolSize** (consumer) | Define the Pool size for executor | 1 | int
+| **queueConsumerMode** (consumer) | Define the Consumer mode: Listen or Poll 
| Listen | HazelcastQueueConsumer Mode
 | **exceptionHandler** (consumer) | To let the consumer use a custom 
ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this 
options is not in use. By default the consumer will deal with exceptions that 
will be logged at WARN or ERROR level and ignored. |  | ExceptionHandler
 | **exchangePattern** (consumer) | Sets the exchange pattern when the consumer 
creates an exchange. |  | ExchangePattern
 | **synchronous** (advanced) | Sets whether synchronous processing should be 
strictly used or Camel is allowed to use asynchronous processing (if 
supported). | false | boolean

http://git-wip-us.apache.org/repos/asf/camel/blob/9a21c070/components/camel-hazelcast/src/main/docs/hazelcast-queue-component.adoc
----------------------------------------------------------------------
diff --git 
a/components/camel-hazelcast/src/main/docs/hazelcast-queue-component.adoc 
b/components/camel-hazelcast/src/main/docs/hazelcast-queue-component.adoc
index 20a3bb4..a7d6aa4 100644
--- a/components/camel-hazelcast/src/main/docs/hazelcast-queue-component.adoc
+++ b/components/camel-hazelcast/src/main/docs/hazelcast-queue-component.adoc
@@ -35,7 +35,7 @@ with the following path and query parameters:
 | **cacheName** | *Required* The name of the cache |  | String
 |=======================================================================
 
-#### Query Parameters (13 parameters):
+#### Query Parameters (16 parameters):
 
 [width="100%",cols="2,5,^1,2",options="header"]
 |=======================================================================
@@ -45,6 +45,9 @@ with the following path and query parameters:
 | **hazelcastInstanceName** (common) | The hazelcast instance reference name 
which can be used for hazelcast endpoint. If you don't specify the instance 
reference camel use the default hazelcast instance from the camel-hazelcast 
instance. |  | String
 | **reliable** (common) | Define if the endpoint will use a reliable Topic 
struct or not. | false | boolean
 | **bridgeErrorHandler** (consumer) | Allows for bridging the consumer to the 
Camel routing Error Handler which mean any exceptions occurred while the 
consumer is trying to pickup incoming messages or the likes will now be 
processed as a message and handled by the routing Error Handler. By default the 
consumer will use the org.apache.camel.spi.ExceptionHandler to deal with 
exceptions that will be logged at WARN or ERROR level and ignored. | false | 
boolean
+| **pollingTimeout** (consumer) | Define the polling timeout of the consumer 
in Poll mode | 10000 | long
+| **poolSize** (consumer) | Define the Pool size for executor | 1 | int
+| **queueConsumerMode** (consumer) | Define the Consumer mode: Listen or Poll 
| Listen | HazelcastQueueConsumer Mode
 | **exceptionHandler** (consumer) | To let the consumer use a custom 
ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this 
options is not in use. By default the consumer will deal with exceptions that 
will be logged at WARN or ERROR level and ignored. |  | ExceptionHandler
 | **exchangePattern** (consumer) | Sets the exchange pattern when the consumer 
creates an exchange. |  | ExchangePattern
 | **synchronous** (advanced) | Sets whether synchronous processing should be 
strictly used or Camel is allowed to use asynchronous processing (if 
supported). | false | boolean

http://git-wip-us.apache.org/repos/asf/camel/blob/9a21c070/components/camel-hazelcast/src/main/docs/hazelcast-replicatedmap-component.adoc
----------------------------------------------------------------------
diff --git 
a/components/camel-hazelcast/src/main/docs/hazelcast-replicatedmap-component.adoc
 
b/components/camel-hazelcast/src/main/docs/hazelcast-replicatedmap-component.adoc
index 29fa092..43faf25 100644
--- 
a/components/camel-hazelcast/src/main/docs/hazelcast-replicatedmap-component.adoc
+++ 
b/components/camel-hazelcast/src/main/docs/hazelcast-replicatedmap-component.adoc
@@ -36,7 +36,7 @@ with the following path and query parameters:
 | **cacheName** | *Required* The name of the cache |  | String
 |=======================================================================
 
-#### Query Parameters (13 parameters):
+#### Query Parameters (16 parameters):
 
 [width="100%",cols="2,5,^1,2",options="header"]
 |=======================================================================
@@ -46,6 +46,9 @@ with the following path and query parameters:
 | **hazelcastInstanceName** (common) | The hazelcast instance reference name 
which can be used for hazelcast endpoint. If you don't specify the instance 
reference camel use the default hazelcast instance from the camel-hazelcast 
instance. |  | String
 | **reliable** (common) | Define if the endpoint will use a reliable Topic 
struct or not. | false | boolean
 | **bridgeErrorHandler** (consumer) | Allows for bridging the consumer to the 
Camel routing Error Handler which mean any exceptions occurred while the 
consumer is trying to pickup incoming messages or the likes will now be 
processed as a message and handled by the routing Error Handler. By default the 
consumer will use the org.apache.camel.spi.ExceptionHandler to deal with 
exceptions that will be logged at WARN or ERROR level and ignored. | false | 
boolean
+| **pollingTimeout** (consumer) | Define the polling timeout of the consumer 
in Poll mode | 10000 | long
+| **poolSize** (consumer) | Define the Pool size for executor | 1 | int
+| **queueConsumerMode** (consumer) | Define the Consumer mode: Listen or Poll 
| Listen | HazelcastQueueConsumer Mode
 | **exceptionHandler** (consumer) | To let the consumer use a custom 
ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this 
options is not in use. By default the consumer will deal with exceptions that 
will be logged at WARN or ERROR level and ignored. |  | ExceptionHandler
 | **exchangePattern** (consumer) | Sets the exchange pattern when the consumer 
creates an exchange. |  | ExchangePattern
 | **synchronous** (advanced) | Sets whether synchronous processing should be 
strictly used or Camel is allowed to use asynchronous processing (if 
supported). | false | boolean

http://git-wip-us.apache.org/repos/asf/camel/blob/9a21c070/components/camel-hazelcast/src/main/docs/hazelcast-seda-component.adoc
----------------------------------------------------------------------
diff --git 
a/components/camel-hazelcast/src/main/docs/hazelcast-seda-component.adoc 
b/components/camel-hazelcast/src/main/docs/hazelcast-seda-component.adoc
index 6961dce..00e1bb7 100644
--- a/components/camel-hazelcast/src/main/docs/hazelcast-seda-component.adoc
+++ b/components/camel-hazelcast/src/main/docs/hazelcast-seda-component.adoc
@@ -36,7 +36,7 @@ with the following path and query parameters:
 | **cacheName** | *Required* The name of the cache |  | String
 |=======================================================================
 
-#### Query Parameters (13 parameters):
+#### Query Parameters (16 parameters):
 
 [width="100%",cols="2,5,^1,2",options="header"]
 |=======================================================================
@@ -46,6 +46,9 @@ with the following path and query parameters:
 | **hazelcastInstanceName** (common) | The hazelcast instance reference name 
which can be used for hazelcast endpoint. If you don't specify the instance 
reference camel use the default hazelcast instance from the camel-hazelcast 
instance. |  | String
 | **reliable** (common) | Define if the endpoint will use a reliable Topic 
struct or not. | false | boolean
 | **bridgeErrorHandler** (consumer) | Allows for bridging the consumer to the 
Camel routing Error Handler which mean any exceptions occurred while the 
consumer is trying to pickup incoming messages or the likes will now be 
processed as a message and handled by the routing Error Handler. By default the 
consumer will use the org.apache.camel.spi.ExceptionHandler to deal with 
exceptions that will be logged at WARN or ERROR level and ignored. | false | 
boolean
+| **pollingTimeout** (consumer) | Define the polling timeout of the consumer 
in Poll mode | 10000 | long
+| **poolSize** (consumer) | Define the Pool size for executor | 1 | int
+| **queueConsumerMode** (consumer) | Define the Consumer mode: Listen or Poll 
| Listen | HazelcastQueueConsumer Mode
 | **exceptionHandler** (consumer) | To let the consumer use a custom 
ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this 
options is not in use. By default the consumer will deal with exceptions that 
will be logged at WARN or ERROR level and ignored. |  | ExceptionHandler
 | **exchangePattern** (consumer) | Sets the exchange pattern when the consumer 
creates an exchange. |  | ExchangePattern
 | **synchronous** (advanced) | Sets whether synchronous processing should be 
strictly used or Camel is allowed to use asynchronous processing (if 
supported). | false | boolean

http://git-wip-us.apache.org/repos/asf/camel/blob/9a21c070/components/camel-hazelcast/src/main/docs/hazelcast-set-component.adoc
----------------------------------------------------------------------
diff --git 
a/components/camel-hazelcast/src/main/docs/hazelcast-set-component.adoc 
b/components/camel-hazelcast/src/main/docs/hazelcast-set-component.adoc
index c6e2d56..3135752 100644
--- a/components/camel-hazelcast/src/main/docs/hazelcast-set-component.adoc
+++ b/components/camel-hazelcast/src/main/docs/hazelcast-set-component.adoc
@@ -35,7 +35,7 @@ with the following path and query parameters:
 | **cacheName** | *Required* The name of the cache |  | String
 |=======================================================================
 
-#### Query Parameters (13 parameters):
+#### Query Parameters (16 parameters):
 
 [width="100%",cols="2,5,^1,2",options="header"]
 |=======================================================================
@@ -45,6 +45,9 @@ with the following path and query parameters:
 | **hazelcastInstanceName** (common) | The hazelcast instance reference name 
which can be used for hazelcast endpoint. If you don't specify the instance 
reference camel use the default hazelcast instance from the camel-hazelcast 
instance. |  | String
 | **reliable** (common) | Define if the endpoint will use a reliable Topic 
struct or not. | false | boolean
 | **bridgeErrorHandler** (consumer) | Allows for bridging the consumer to the 
Camel routing Error Handler which mean any exceptions occurred while the 
consumer is trying to pickup incoming messages or the likes will now be 
processed as a message and handled by the routing Error Handler. By default the 
consumer will use the org.apache.camel.spi.ExceptionHandler to deal with 
exceptions that will be logged at WARN or ERROR level and ignored. | false | 
boolean
+| **pollingTimeout** (consumer) | Define the polling timeout of the consumer 
in Poll mode | 10000 | long
+| **poolSize** (consumer) | Define the Pool size for executor | 1 | int
+| **queueConsumerMode** (consumer) | Define the Consumer mode: Listen or Poll 
| Listen | HazelcastQueueConsumer Mode
 | **exceptionHandler** (consumer) | To let the consumer use a custom 
ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this 
options is not in use. By default the consumer will deal with exceptions that 
will be logged at WARN or ERROR level and ignored. |  | ExceptionHandler
 | **exchangePattern** (consumer) | Sets the exchange pattern when the consumer 
creates an exchange. |  | ExchangePattern
 | **synchronous** (advanced) | Sets whether synchronous processing should be 
strictly used or Camel is allowed to use asynchronous processing (if 
supported). | false | boolean

http://git-wip-us.apache.org/repos/asf/camel/blob/9a21c070/components/camel-hazelcast/src/main/docs/hazelcast-topic-component.adoc
----------------------------------------------------------------------
diff --git 
a/components/camel-hazelcast/src/main/docs/hazelcast-topic-component.adoc 
b/components/camel-hazelcast/src/main/docs/hazelcast-topic-component.adoc
index 9af9ae5..542aa3e 100644
--- a/components/camel-hazelcast/src/main/docs/hazelcast-topic-component.adoc
+++ b/components/camel-hazelcast/src/main/docs/hazelcast-topic-component.adoc
@@ -35,7 +35,7 @@ with the following path and query parameters:
 | **cacheName** | *Required* The name of the cache |  | String
 |=======================================================================
 
-#### Query Parameters (13 parameters):
+#### Query Parameters (16 parameters):
 
 [width="100%",cols="2,5,^1,2",options="header"]
 |=======================================================================
@@ -45,6 +45,9 @@ with the following path and query parameters:
 | **hazelcastInstanceName** (common) | The hazelcast instance reference name 
which can be used for hazelcast endpoint. If you don't specify the instance 
reference camel use the default hazelcast instance from the camel-hazelcast 
instance. |  | String
 | **reliable** (common) | Define if the endpoint will use a reliable Topic 
struct or not. | false | boolean
 | **bridgeErrorHandler** (consumer) | Allows for bridging the consumer to the 
Camel routing Error Handler which mean any exceptions occurred while the 
consumer is trying to pickup incoming messages or the likes will now be 
processed as a message and handled by the routing Error Handler. By default the 
consumer will use the org.apache.camel.spi.ExceptionHandler to deal with 
exceptions that will be logged at WARN or ERROR level and ignored. | false | 
boolean
+| **pollingTimeout** (consumer) | Define the polling timeout of the consumer 
in Poll mode | 10000 | long
+| **poolSize** (consumer) | Define the Pool size for executor | 1 | int
+| **queueConsumerMode** (consumer) | Define the Consumer mode: Listen or Poll 
| Listen | HazelcastQueueConsumer Mode
 | **exceptionHandler** (consumer) | To let the consumer use a custom 
ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this 
options is not in use. By default the consumer will deal with exceptions that 
will be logged at WARN or ERROR level and ignored. |  | ExceptionHandler
 | **exchangePattern** (consumer) | Sets the exchange pattern when the consumer 
creates an exchange. |  | ExchangePattern
 | **synchronous** (advanced) | Sets whether synchronous processing should be 
strictly used or Camel is allowed to use asynchronous processing (if 
supported). | false | boolean

http://git-wip-us.apache.org/repos/asf/camel/blob/9a21c070/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastComponent.java
----------------------------------------------------------------------
diff --git 
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastComponent.java
 
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastComponent.java
index 55f8d0e..4d270db 100644
--- 
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastComponent.java
+++ 
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastComponent.java
@@ -35,6 +35,7 @@ import 
org.apache.camel.component.hazelcast.instance.HazelcastInstanceEndpoint;
 import org.apache.camel.component.hazelcast.list.HazelcastListEndpoint;
 import org.apache.camel.component.hazelcast.map.HazelcastMapEndpoint;
 import org.apache.camel.component.hazelcast.multimap.HazelcastMultimapEndpoint;
+import org.apache.camel.component.hazelcast.queue.HazelcastQueueConfiguration;
 import org.apache.camel.component.hazelcast.queue.HazelcastQueueEndpoint;
 import 
org.apache.camel.component.hazelcast.replicatedmap.HazelcastReplicatedmapEndpoint;
 import 
org.apache.camel.component.hazelcast.ringbuffer.HazelcastRingbufferEndpoint;
@@ -112,7 +113,9 @@ public class HazelcastComponent extends 
HazelcastDefaultComponent {
         if (remaining.startsWith(HazelcastConstants.QUEUE_PREFIX)) {
             // remaining is anything (name it foo ;)
             remaining = 
StringHelper.removeStartingCharacters(remaining.substring(HazelcastConstants.QUEUE_PREFIX.length()),
 '/');
-            endpoint = new HazelcastQueueEndpoint(hzInstance, uri, this, 
remaining);
+            final HazelcastQueueConfiguration config = new 
HazelcastQueueConfiguration();
+            setProperties(config, parameters);
+            endpoint = new HazelcastQueueEndpoint(hzInstance, uri, this, 
remaining, config);
             endpoint.setCommand(HazelcastCommand.queue);
         }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/9a21c070/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastDefaultEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastDefaultEndpoint.java
 
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastDefaultEndpoint.java
index f97ed3f..995c34c 100644
--- 
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastDefaultEndpoint.java
+++ 
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastDefaultEndpoint.java
@@ -21,6 +21,7 @@ import org.apache.camel.Component;
 import org.apache.camel.Consumer;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
+import org.apache.camel.component.hazelcast.queue.HazelcastQueueConfiguration;
 import org.apache.camel.component.hazelcast.seda.HazelcastSedaConfiguration;
 import org.apache.camel.component.hazelcast.topic.HazelcastTopicConfiguration;
 import org.apache.camel.impl.DefaultEndpoint;
@@ -46,6 +47,8 @@ public abstract class HazelcastDefaultEndpoint extends 
DefaultEndpoint {
     private HazelcastSedaConfiguration hazelcastSedaConfiguration; // to 
include component schema docs
     @UriParam
     private HazelcastTopicConfiguration hazelcastTopicConfiguration; 
+    @UriParam
+    private HazelcastQueueConfiguration hazelcastQueueConfiguration; 
 
     public HazelcastDefaultEndpoint(HazelcastInstance hazelcastInstance, 
String endpointUri, Component component) {
         this(hazelcastInstance, endpointUri, component, null);

http://git-wip-us.apache.org/repos/asf/camel/blob/9a21c070/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueComponent.java
----------------------------------------------------------------------
diff --git 
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueComponent.java
 
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueComponent.java
index ba8bc75..53f2ef6 100644
--- 
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueComponent.java
+++ 
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueComponent.java
@@ -36,7 +36,10 @@ public class HazelcastQueueComponent extends 
HazelcastDefaultComponent {
 
     @Override
     protected HazelcastDefaultEndpoint doCreateEndpoint(String uri, String 
remaining, Map<String, Object> parameters, HazelcastInstance hzInstance) throws 
Exception {
-        return new HazelcastQueueEndpoint(hzInstance, uri, this, remaining);
+        final HazelcastQueueConfiguration config = new 
HazelcastQueueConfiguration();
+        setProperties(config, parameters);
+        HazelcastQueueEndpoint answer = new HazelcastQueueEndpoint(hzInstance, 
uri, this, remaining, config);
+        return answer;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/9a21c070/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueConfiguration.java
----------------------------------------------------------------------
diff --git 
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueConfiguration.java
 
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueConfiguration.java
new file mode 100644
index 0000000..fe59479
--- /dev/null
+++ 
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueConfiguration.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.hazelcast.queue;
+
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriParams;
+
+/**
+ * Hazelcast Queue Component configuration.
+ */
+@UriParams
+public class HazelcastQueueConfiguration {
+
+    @UriParam(label = "consumer", defaultValue = "10000")
+    private long pollingTimeout = 10000L;
+    @UriParam(label = "consumer", defaultValue = "Listen")
+    private HazelcastQueueConsumerMode queueConsumerMode = 
HazelcastQueueConsumerMode.LISTEN;
+    @UriParam(label = "consumer", defaultValue = "1")
+    private int poolSize = 1;
+
+
+    /**
+     * Define the polling timeout of the Queue consumer in Poll mode
+     */
+    public long getPollingTimeout() {
+        return pollingTimeout;
+    }
+
+    public void setPollingTimeout(long pollingTimeout) {
+        this.pollingTimeout = pollingTimeout;
+    }
+
+    /**
+     * Define the Queue Consumer mode: Listen or Poll 
+     */
+    public HazelcastQueueConsumerMode getQueueConsumerMode() {
+        return queueConsumerMode;
+    }
+
+    public void setQueueConsumerMode(HazelcastQueueConsumerMode 
queueConsumerMode) {
+        this.queueConsumerMode = queueConsumerMode;
+    }
+
+    /**
+     * Define the Pool size for Queue Consumer Executor
+     */
+    public int getPoolSize() {
+        return poolSize;
+    }
+
+    public void setPoolSize(int poolSize) {
+        this.poolSize = poolSize;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/9a21c070/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueConsumer.java
 
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueConsumer.java
index 65e1111..8c6fab5 100644
--- 
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueConsumer.java
+++ 
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueConsumer.java
@@ -16,24 +16,93 @@
  */
 package org.apache.camel.component.hazelcast.queue;
 
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
 import com.hazelcast.core.HazelcastInstance;
 import com.hazelcast.core.IQueue;
 
 import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.component.hazelcast.HazelcastDefaultConsumer;
 import org.apache.camel.component.hazelcast.listener.CamelItemListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-/**
- *
- */
 public class HazelcastQueueConsumer extends HazelcastDefaultConsumer {
 
-    public HazelcastQueueConsumer(HazelcastInstance hazelcastInstance, 
Endpoint endpoint, Processor processor, String cacheName) {
+    private static final Logger LOG = 
LoggerFactory.getLogger(HazelcastQueueConsumer.class);
+    private final Processor processor;
+    private ExecutorService executor;
+    private QueueConsumerTask queueConsumerTask;
+    private HazelcastQueueConfiguration config;
+
+    public HazelcastQueueConsumer(HazelcastInstance hazelcastInstance, 
Endpoint endpoint, Processor processor, String cacheName, final 
HazelcastQueueConfiguration configuration) {
         super(hazelcastInstance, endpoint, processor, cacheName);
+        this.processor = processor;
+        this.config = configuration;
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+        executor = ((HazelcastQueueEndpoint)getEndpoint()).createExecutor();
+
+        CamelItemListener camelItemListener = new CamelItemListener(this, 
cacheName);
+        queueConsumerTask = new QueueConsumerTask(camelItemListener);
+        executor.submit(queueConsumerTask);
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        super.doStop();
+
+        if (executor != null) {
+            if (getEndpoint() != null && getEndpoint().getCamelContext() != 
null) {
+                
getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(executor);
+            } else {
+                executor.shutdownNow();
+            }
+        }
+        executor = null;
+    }
+
+    class QueueConsumerTask implements Runnable {
+
+        CamelItemListener camelItemListener;
+
+        public QueueConsumerTask(CamelItemListener camelItemListener) {
+            this.camelItemListener = camelItemListener;
+        }
+
+        @Override
+        public void run() {
+            IQueue<Object> queue = hazelcastInstance.getQueue(cacheName);
+            if (config.getQueueConsumerMode() == 
HazelcastQueueConsumerMode.LISTEN) {
+                queue.addItemListener(camelItemListener, true);
+            }
 
-        IQueue<Object> queue = hazelcastInstance.getQueue(cacheName);
-        queue.addItemListener(new CamelItemListener(this, cacheName), true);
+            if (config.getQueueConsumerMode() == 
HazelcastQueueConsumerMode.POLL) {
+                while (isRunAllowed()) {
+                    try {
+                        final Object body = 
queue.poll(config.getPollingTimeout(), TimeUnit.MILLISECONDS);
+                        Exchange exchange = getEndpoint().createExchange();
+                        exchange.getOut().setBody(body);
+                        try {
+                            processor.process(exchange);
+                        } catch (Exception e) {
+                            getExceptionHandler().handleException("Error 
during processing", exchange, e);
+                        }
+                    } catch (InterruptedException e) {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("Hazelcast Queue Consumer Interrupted: " 
+ e, e);
+                            continue;
+                        }
+                    }
+                }
+            }
+        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/9a21c070/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueConsumerMode.java
----------------------------------------------------------------------
diff --git 
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueConsumerMode.java
 
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueConsumerMode.java
new file mode 100644
index 0000000..01c070d
--- /dev/null
+++ 
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueConsumerMode.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.hazelcast.queue;
+
+public enum HazelcastQueueConsumerMode {
+
+    LISTEN("listen"),
+    POLL("poll");
+
+
+    private static HazelcastQueueConsumerMode[] values = values();
+    private final String mode;
+
+    HazelcastQueueConsumerMode(String mode) {
+        this.mode = mode;
+    }
+
+    public static HazelcastQueueConsumerMode getHazelcastOperation(String 
name) {
+        if (name == null) {
+            return null;
+        }
+        for (HazelcastQueueConsumerMode hazelcastQueueConsumerMode : values) {
+            if (hazelcastQueueConsumerMode.toString().equalsIgnoreCase(name) 
|| hazelcastQueueConsumerMode.name().equalsIgnoreCase(name)) {
+                return hazelcastQueueConsumerMode;
+            }
+        }
+        throw new IllegalArgumentException(String.format("Mode '%s' is not 
supported by this component.", name));
+    }
+
+    public String toString() {
+        return mode;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/9a21c070/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueEndpoint.java
 
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueEndpoint.java
index a484577..64568c3 100644
--- 
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueEndpoint.java
+++ 
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueEndpoint.java
@@ -16,7 +16,10 @@
  */
 package org.apache.camel.component.hazelcast.queue;
 
+import java.util.concurrent.ExecutorService;
+
 import com.hazelcast.core.HazelcastInstance;
+
 import org.apache.camel.Component;
 import org.apache.camel.Consumer;
 import org.apache.camel.Processor;
@@ -31,16 +34,19 @@ import org.apache.camel.spi.UriEndpoint;
  */
 @UriEndpoint(firstVersion = "2.7.0", scheme = "hazelcast-queue", title = 
"Hazelcast Queue", syntax = "hazelcast-queue:cacheName", label = 
"cache,datagrid")
 public class HazelcastQueueEndpoint extends HazelcastDefaultEndpoint {
+    
+    private final HazelcastQueueConfiguration configuration;
 
-    public HazelcastQueueEndpoint(HazelcastInstance hazelcastInstance, String 
endpointUri, Component component, String cacheName) {
+    public HazelcastQueueEndpoint(HazelcastInstance hazelcastInstance, String 
endpointUri, Component component, String cacheName, final 
HazelcastQueueConfiguration configuration) {
         super(hazelcastInstance, endpointUri, component, cacheName);
+        this.configuration = configuration;
         setCommand(HazelcastCommand.queue);
         setDefaultOperation(HazelcastOperation.ADD);
     }
 
     @Override
     public Consumer createConsumer(Processor processor) throws Exception {
-        HazelcastQueueConsumer answer = new 
HazelcastQueueConsumer(hazelcastInstance, this, processor, cacheName);
+        HazelcastQueueConsumer answer = new 
HazelcastQueueConsumer(hazelcastInstance, this, processor, cacheName, 
configuration);
         configureConsumer(answer);
         return answer;
     }
@@ -49,5 +55,9 @@ public class HazelcastQueueEndpoint extends 
HazelcastDefaultEndpoint {
     public Producer createProducer() throws Exception {
         return new HazelcastQueueProducer(hazelcastInstance, this, cacheName);
     }
+    
+    public ExecutorService createExecutor() {
+        return 
getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, 
"QueueConsumer", configuration.getPoolSize());
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/9a21c070/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastQueueConsumerPollTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastQueueConsumerPollTest.java
 
b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastQueueConsumerPollTest.java
new file mode 100644
index 0000000..a6d7524
--- /dev/null
+++ 
b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastQueueConsumerPollTest.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.hazelcast;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.IQueue;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
+import org.mockito.Mock;
+
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.atLeast;
+
+public class HazelcastQueueConsumerPollTest extends HazelcastCamelTestSupport {
+
+    @Mock
+    private IQueue<String> queue;
+    
+    @Override
+    @SuppressWarnings("unchecked")
+    protected void trainHazelcastInstance(HazelcastInstance hazelcastInstance) 
{
+        when(hazelcastInstance.<String>getQueue("foo")).thenReturn(queue);
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    protected void verifyHazelcastInstance(HazelcastInstance 
hazelcastInstance) {
+        verify(hazelcastInstance).getQueue("foo");
+        try {
+            verify(queue, atLeast(1)).poll(10000, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void add() throws InterruptedException {
+        when(queue.poll(10000, TimeUnit.MILLISECONDS)).thenReturn("foo");
+        
+        MockEndpoint out = getMockEndpoint("mock:result");
+        out.expectedMessageCount(1);
+
+
+        assertMockEndpointsSatisfied(2000, TimeUnit.MILLISECONDS);
+
+        
this.checkHeadersAbsence(out.getExchanges().get(0).getIn().getHeaders(), 
HazelcastConstants.ADDED);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from(String.format("hazelcast-%sfoo?queueConsumerMode=Poll", 
HazelcastConstants.QUEUE_PREFIX)).to("mock:result");
+            }
+        };
+    }
+
+    private void checkHeadersAbsence(Map<String, Object> headers, String 
action) {
+        assertNotEquals(action, 
headers.get(HazelcastConstants.LISTENER_ACTION));
+        assertNotEquals(HazelcastConstants.CACHE_LISTENER, 
headers.get(HazelcastConstants.LISTENER_TYPE));
+        assertNull(headers.get(HazelcastConstants.LISTENER_TIME));
+    }
+
+}

Reply via email to