Repository: camel
Updated Branches:
  refs/heads/master 1aea80a26 -> b62f330c6


CAMEL-8992 Camel-Hazelcast: add drainTo operation to hazelcast queue producer


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3f8b9b1d
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3f8b9b1d
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3f8b9b1d

Branch: refs/heads/master
Commit: 3f8b9b1d461c17f1b72803e7cfbfdb992b658adb
Parents: 1aea80a
Author: Andrea Cosentino <anco...@gmail.com>
Authored: Tue Jun 23 17:14:32 2015 +0200
Committer: Andrea Cosentino <anco...@gmail.com>
Committed: Tue Jun 23 17:14:32 2015 +0200

----------------------------------------------------------------------
 .../component/hazelcast/HazelcastConstants.java |  4 ++-
 .../hazelcast/queue/HazelcastQueueProducer.java | 26 ++++++++++++++++++++
 .../hazelcast/HazelcastQueueProducerTest.java   | 20 +++++++++++++++
 3 files changed, 49 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/3f8b9b1d/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastConstants.java
----------------------------------------------------------------------
diff --git 
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastConstants.java
 
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastConstants.java
index a146d46..6c42706 100644
--- 
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastConstants.java
+++ 
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastConstants.java
@@ -41,6 +41,7 @@ public final class HazelcastConstants {
     public static final String TTL_UNIT = "CamelHazelcastObjectTtlUnit";
     public static final String QUERY = "CamelHazelcastQuery";
     public static final String EXPECTED_VALUE = "CamelHazelcastExpectedValue";
+    public static final String DRAIN_TO_COLLECTION = 
"CamelHazelcastDrainToCollection";
 
     /*
      * outgoing header properties
@@ -89,9 +90,10 @@ public final class HazelcastConstants {
     public static final int PEEK_OPERATION = 33;
     public static final int POLL_OPERATION = 34;
     public static final int REMAINING_CAPACITY_OPERATION = 35;
+    public static final int DRAIN_TO_OPERATION = 36;
 
     // topic
-    public static final int PUBLISH_OPERATION = 36;
+    public static final int PUBLISH_OPERATION = 37;
 
     /*
      * header values

http://git-wip-us.apache.org/repos/asf/camel/blob/3f8b9b1d/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueProducer.java
 
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueProducer.java
index c814e3c..671eb60 100644
--- 
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueProducer.java
+++ 
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueProducer.java
@@ -16,6 +16,9 @@
  */
 package org.apache.camel.component.hazelcast.queue;
 
+import java.util.Collection;
+import java.util.Map;
+
 import com.hazelcast.core.HazelcastInstance;
 import com.hazelcast.core.IQueue;
 
@@ -24,6 +27,7 @@ import 
org.apache.camel.component.hazelcast.HazelcastComponentHelper;
 import org.apache.camel.component.hazelcast.HazelcastConstants;
 import org.apache.camel.component.hazelcast.HazelcastDefaultEndpoint;
 import org.apache.camel.component.hazelcast.HazelcastDefaultProducer;
+import org.apache.camel.util.ObjectHelper;
 
 /**
  *
@@ -38,6 +42,15 @@ public class HazelcastQueueProducer extends 
HazelcastDefaultProducer {
     }
 
     public void process(Exchange exchange) throws Exception {
+        
+        Map<String, Object> headers = exchange.getIn().getHeaders();
+
+        // get header parameters
+        Object draintToCollection = null;
+        
+        if (headers.containsKey(HazelcastConstants.DRAIN_TO_COLLECTION)) {
+            draintToCollection = 
headers.get(HazelcastConstants.DRAIN_TO_COLLECTION);
+        }
 
         final int operation = lookupOperationNumber(exchange);
 
@@ -73,6 +86,14 @@ public class HazelcastQueueProducer extends 
HazelcastDefaultProducer {
             this.remainingCapacity(exchange);
             break;
             
+        case HazelcastConstants.DRAIN_TO_OPERATION:
+            if (ObjectHelper.isNotEmpty(draintToCollection)) {
+                this.drainTo((Collection) draintToCollection, exchange);
+            } else {
+                throw new IllegalArgumentException("Drain to collection header 
must be specified");
+            }
+            break;
+            
         default:
             throw new IllegalArgumentException(String.format("The value '%s' 
is not allowed for parameter '%s' on the QUEUE cache.", operation, 
HazelcastConstants.OPERATION));
         }
@@ -117,4 +138,9 @@ public class HazelcastQueueProducer extends 
HazelcastDefaultProducer {
     private void remainingCapacity(Exchange exchange) {
         exchange.getOut().setBody(this.queue.remainingCapacity());
     }
+    
+    private void drainTo(Collection c, Exchange exchange) {
+        exchange.getOut().setBody(this.queue.drainTo(c));
+        exchange.getOut().setHeader(HazelcastConstants.DRAIN_TO_COLLECTION, c);
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/3f8b9b1d/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastQueueProducerTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastQueueProducerTest.java
 
b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastQueueProducerTest.java
index 3807e98..24a4a78 100644
--- 
a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastQueueProducerTest.java
+++ 
b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastQueueProducerTest.java
@@ -16,8 +16,14 @@
  */
 package org.apache.camel.component.hazelcast;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
 import com.hazelcast.core.HazelcastInstance;
 import com.hazelcast.core.IQueue;
+
 import org.apache.camel.CamelExecutionException;
 import org.apache.camel.builder.RouteBuilder;
 import org.junit.After;
@@ -122,6 +128,17 @@ public class HazelcastQueueProducerTest extends 
HazelcastCamelTestSupport {
         verify(queue).remainingCapacity();
         assertEquals(10, answer);
     }
+    
+    @Test
+    public void drainTo() throws InterruptedException {
+        Map<String, Object> headers = new HashMap<String, Object>();
+        Collection l = new ArrayList<>();
+        headers.put(HazelcastConstants.DRAIN_TO_COLLECTION, l);
+        when(queue.drainTo(l)).thenReturn(10);
+        int answer = template.requestBodyAndHeaders("direct:drainTo", "test", 
headers, Integer.class);
+        verify(queue).drainTo(l);
+        assertEquals(10, answer);
+    }
 
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
@@ -148,6 +165,9 @@ public class HazelcastQueueProducerTest extends 
HazelcastCamelTestSupport {
                 
from("direct:remainingCapacity").setHeader(HazelcastConstants.OPERATION, 
constant(HazelcastConstants.REMAINING_CAPACITY_OPERATION)).to(
                         String.format("hazelcast:%sbar", 
HazelcastConstants.QUEUE_PREFIX));
                 
+                from("direct:drainTo").setHeader(HazelcastConstants.OPERATION, 
constant(HazelcastConstants.DRAIN_TO_OPERATION)).to(
+                        String.format("hazelcast:%sbar", 
HazelcastConstants.QUEUE_PREFIX));
+                
                 
from("direct:putWithOperationNumber").toF(String.format("hazelcast:%sbar?operation=%s",
 HazelcastConstants.QUEUE_PREFIX, HazelcastConstants.PUT_OPERATION));
 
                 
from("direct:putWithOperationName").toF(String.format("hazelcast:%sbar?operation=put",
 HazelcastConstants.QUEUE_PREFIX));

Reply via email to