Repository: camel
Updated Branches:
  refs/heads/master 381d53657 -> bc07f3168


Resolve CAMEL-8681 Camel-Infinispan: use Lifespan and Max Idle Time in the 
implementation


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/105525e5
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/105525e5
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/105525e5

Branch: refs/heads/master
Commit: 105525e5a9c42c0706acdfff667cd50671b4fb0b
Parents: 381d536
Author: ancosen <anco...@gmail.com>
Authored: Tue Apr 21 22:13:45 2015 +0200
Committer: ancosen <anco...@gmail.com>
Committed: Tue Apr 21 22:13:45 2015 +0200

----------------------------------------------------------------------
 .../infinispan/InfinispanConstants.java         |   6 +
 .../infinispan/InfinispanOperation.java         |  59 +++++-
 .../infinispan/InfinispanProducerTest.java      | 181 +++++++++++++++++--
 3 files changed, 226 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/105525e5/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java
----------------------------------------------------------------------
diff --git 
a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java
 
b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java
index 38221fa..6f69e7f 100644
--- 
a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java
+++ 
b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java
@@ -22,14 +22,20 @@ public interface InfinispanConstants {
     String CACHE_NAME = "CamelInfinispanCacheName";
     String KEY = "CamelInfinispanKey";
     String VALUE = "CamelInfinispanValue";
+    String MAP = "CamelInfinispanMap";    
     String OPERATION = "CamelInfinispanOperation";
     String PUT = "CamelInfinispanOperationPut";
     String PUT_IF_ABSENT = "CamelInfinispanOperationPutIfAbsent";
     String GET = "CamelInfinispanOperationGet";
     String CONTAINS_KEY = "CamelInfinispanOperationContainsKey";
     String CONTAINS_VALUE = "CamelInfinispanOperationContainsValue";
+    String PUT_ALL = "CamelInfinispanOperationPutAll";
     String REMOVE = "CamelInfinispanOperationRemove";
     String REPLACE = "CamelInfinispanOperationReplace";
     String CLEAR = "CamelInfinispanOperationClear";
     String RESULT = "CamelInfinispanOperationResult";
+    String LIFESPAN_TIME = "CamelInfinispanLifespanTime";
+    String LIFESPAN_TIME_UNIT = "CamelInfinispanTimeUnit";
+    String MAX_IDLE_TIME = "CamelInfinispanMaxIdleTime";
+    String MAX_IDLE_TIME_UNIT = "CamelInfinispanMaxIdleTimeUnit";
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/105525e5/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanOperation.java
----------------------------------------------------------------------
diff --git 
a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanOperation.java
 
b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanOperation.java
index 023062d..3fb0a88 100644
--- 
a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanOperation.java
+++ 
b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanOperation.java
@@ -16,7 +16,11 @@
  */
 package org.apache.camel.component.infinispan;
 
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.camel.Exchange;
+import org.apache.camel.util.ObjectHelper;
 import org.infinispan.commons.api.BasicCache;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -53,13 +57,60 @@ public class InfinispanOperation {
         PUT {
             @Override
             void execute(BasicCache<Object, Object> cache, Exchange exchange) {
-                Object result = cache.put(getKey(exchange), 
getValue(exchange));
+                Object result;
+                if 
(!ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME))
 && 
!ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT)))
 {
+                    long lifespan = (long) 
exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME);
+                    String timeUnit =  (String) 
exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT);
+                    if 
(!ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME))
 
+                        && 
!ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT)))
 {
+                        long maxIdle = (long) 
exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME);
+                        String maxIdleTimeUnit =  (String) 
exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT);
+                        result = cache.put(getKey(exchange), 
getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit), maxIdle, 
TimeUnit.valueOf(maxIdleTimeUnit));
+                    } else {
+                        result = cache.put(getKey(exchange), 
getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit));
+                    }
+                } else {
+                    result = cache.put(getKey(exchange), getValue(exchange));
+                }
                 setResult(result, exchange);
             }
+        }, PUTALL {
+            @Override
+            void execute(BasicCache<Object, Object> cache, Exchange exchange) {
+                Object result;
+                if 
(!ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME))
 && 
!ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT)))
 {
+                    long lifespan = (long) 
exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME);
+                    String timeUnit =  (String) 
exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT);
+                    if 
(!ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME))
 
+                        && 
!ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT)))
 {
+                        long maxIdle = (long) 
exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME);
+                        String maxIdleTimeUnit =  (String) 
exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT);
+                        cache.putAll(getMap(exchange), lifespan, 
TimeUnit.valueOf(timeUnit), maxIdle, TimeUnit.valueOf(maxIdleTimeUnit));
+                    } else {
+                        cache.putAll(getMap(exchange), lifespan, 
TimeUnit.valueOf(timeUnit));
+                    }
+                } else {
+                    cache.putAll(getMap(exchange));
+                }
+            }
         }, PUTIFABSENT {
             @Override
             void execute(BasicCache<Object, Object> cache, Exchange exchange) {
-                Object result = cache.putIfAbsent(getKey(exchange), 
getValue(exchange));
+                Object result;
+                if 
(!ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME))
 && 
!ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT)))
 {
+                    long lifespan = (long) 
exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME);
+                    String timeUnit =  (String) 
exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT);
+                    if 
(!ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME))
 
+                        && 
!ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT)))
 {
+                        long maxIdle = (long) 
exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME);
+                        String maxIdleTimeUnit =  (String) 
exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT);
+                        result = cache.putIfAbsent(getKey(exchange), 
getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit), maxIdle, 
TimeUnit.valueOf(maxIdleTimeUnit));
+                    } else {
+                        result = cache.putIfAbsent(getKey(exchange), 
getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit));
+                    }
+                } else {
+                    result = cache.putIfAbsent(getKey(exchange), 
getValue(exchange));
+                }
                 setResult(result, exchange);
             }
         }, GET {
@@ -110,6 +161,10 @@ public class InfinispanOperation {
         Object getValue(Exchange exchange) {
             return exchange.getIn().getHeader(InfinispanConstants.VALUE);
         }
+        
+        Map<? extends Object, ? extends Object>  getMap(Exchange exchange) {
+            return (Map<? extends Object, ? extends Object>) 
exchange.getIn().getHeader(InfinispanConstants.MAP);
+        }
 
         abstract void execute(BasicCache<Object, Object> cache, Exchange 
exchange);
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/105525e5/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanProducerTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanProducerTest.java
 
b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanProducerTest.java
index 4061864..26d6d9e 100644
--- 
a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanProducerTest.java
+++ 
b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanProducerTest.java
@@ -16,18 +16,22 @@
  */
 package org.apache.camel.component.infinispan;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.junit.Test;
 
-import static org.hamcrest.CoreMatchers.nullValue;
-import static org.hamcrest.core.Is.is;
-
 public class InfinispanProducerTest extends InfinispanTestSupport {
 
     private static final String COMMAND_VALUE = "commandValue";
     private static final String COMMAND_KEY = "commandKey1";
+    private static final long LIFESPAN_TIME = 5;
+    private static final long LIFESPAN_FOR_MAX_IDLE = -1;
+    private static final long MAX_IDLE_TIME = 3;
 
     @Test
     public void keyAndValueArePublishedWithDefaultOperation() throws Exception 
{
@@ -40,7 +44,7 @@ public class InfinispanProducerTest extends 
InfinispanTestSupport {
         });
 
         Object value = currentCache().get(KEY_ONE);
-        assertThat(value.toString(), is(VALUE_ONE));
+        assertEquals(value.toString(), VALUE_ONE);
     }
 
     @Test
@@ -55,7 +59,110 @@ public class InfinispanProducerTest extends 
InfinispanTestSupport {
         });
 
         Object value = currentCache().get(KEY_ONE);
-        assertThat(value.toString(), is(VALUE_ONE));
+        assertEquals(value.toString(), VALUE_ONE);
+    }
+    
+    @Test
+    public void publishMapNormal() throws Exception {
+        template.send("direct:start", new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                Map<String, String> map = new HashMap<String, String>();
+                map.put(KEY_ONE, VALUE_ONE);
+                map.put(KEY_TWO, VALUE_TWO);
+                exchange.getIn().setHeader(InfinispanConstants.MAP, map);
+                exchange.getIn().setHeader(InfinispanConstants.OPERATION, 
InfinispanConstants.PUT_ALL);
+            }
+        });
+
+        assertEquals(currentCache().size(), 2);
+        Object value = currentCache().get(KEY_ONE);
+        assertEquals(value.toString(), VALUE_ONE);
+        value = currentCache().get(KEY_TWO);
+        assertEquals(value.toString(), VALUE_TWO);
+    }
+    
+    @Test
+    public void publishMapWithLifespan() throws Exception {
+        template.send("direct:start", new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                Map<String, String> map = new HashMap<String, String>();
+                map.put(KEY_ONE, VALUE_ONE);
+                map.put(KEY_TWO, VALUE_TWO);
+                exchange.getIn().setHeader(InfinispanConstants.MAP, map);
+                exchange.getIn().setHeader(InfinispanConstants.OPERATION, 
InfinispanConstants.PUT_ALL);
+                exchange.getIn().setHeader(InfinispanConstants.LIFESPAN_TIME, 
new Long(LIFESPAN_TIME));
+                
exchange.getIn().setHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, 
TimeUnit.SECONDS.toString());
+            }
+        });
+
+        assertEquals(currentCache().size(), 2);
+        Object value = currentCache().get(KEY_ONE);
+        assertEquals(value.toString(), VALUE_ONE);
+        value = currentCache().get(KEY_TWO);
+        assertEquals(value.toString(), VALUE_TWO);
+        
+        Thread.sleep(LIFESPAN_TIME * 1000);
+        
+        Exchange exchange = template.send("direct:get", new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE);
+            }
+        });
+        String resultGet = 
exchange.getIn().getHeader(InfinispanConstants.RESULT, String.class);
+        assertEquals(null, resultGet);
+        
+        exchange = template.send("direct:get", new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_TWO);
+            }
+        });
+        resultGet = exchange.getIn().getHeader(InfinispanConstants.RESULT, 
String.class);
+        assertEquals(null, resultGet);
+    }
+    
+    @Test
+    public void publishMapWithLifespanAndMaxIdleTime() throws Exception {
+        template.send("direct:start", new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                Map<String, String> map = new HashMap<String, String>();
+                map.put(KEY_ONE, VALUE_ONE);
+                map.put(KEY_TWO, VALUE_TWO);
+                exchange.getIn().setHeader(InfinispanConstants.MAP, map);
+                exchange.getIn().setHeader(InfinispanConstants.OPERATION, 
InfinispanConstants.PUT_ALL);
+                exchange.getIn().setHeader(InfinispanConstants.LIFESPAN_TIME, 
new Long(LIFESPAN_FOR_MAX_IDLE));
+                
exchange.getIn().setHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, 
TimeUnit.SECONDS.toString());
+                exchange.getIn().setHeader(InfinispanConstants.MAX_IDLE_TIME, 
new Long(MAX_IDLE_TIME));
+                
exchange.getIn().setHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, 
TimeUnit.SECONDS.toString());
+            }
+        });
+
+        assertEquals(currentCache().size(), 2);
+        Object value = currentCache().get(KEY_ONE);
+        
+        Thread.sleep(10000);
+        
+        Exchange exchange = template.send("direct:get", new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE);
+            }
+        });
+        String resultGet = 
exchange.getIn().getHeader(InfinispanConstants.RESULT, String.class);
+        assertEquals(null, resultGet);
+        
+        exchange = template.send("direct:get", new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_TWO);
+            }
+        });
+        resultGet = exchange.getIn().getHeader(InfinispanConstants.RESULT, 
String.class);
+        assertEquals(null, resultGet);
     }
     
     @Test
@@ -72,7 +179,7 @@ public class InfinispanProducerTest extends 
InfinispanTestSupport {
         });
 
         Object value = currentCache().get(KEY_ONE);
-        assertThat(value.toString(), is(VALUE_ONE));
+        assertEquals(value.toString(), VALUE_ONE);
         assertEquals(currentCache().size(), 1);
     }
     
@@ -90,7 +197,7 @@ public class InfinispanProducerTest extends 
InfinispanTestSupport {
         });
 
         Object value = currentCache().get(KEY_TWO);
-        assertThat(value.toString(), is(VALUE_TWO));
+        assertEquals(value.toString(), VALUE_TWO);
         assertEquals(currentCache().size(), 2);
     }
     
@@ -106,7 +213,7 @@ public class InfinispanProducerTest extends 
InfinispanTestSupport {
             }
         });
 
-        assertThat(exchange.getIn().getHeader(InfinispanConstants.RESULT, 
Boolean.class), is(false));
+        assertEquals(exchange.getIn().getHeader(InfinispanConstants.RESULT, 
Boolean.class), false);
     }
     
     @Test
@@ -121,7 +228,7 @@ public class InfinispanProducerTest extends 
InfinispanTestSupport {
             }
         });
 
-        assertThat(exchange.getIn().getHeader(InfinispanConstants.RESULT, 
Boolean.class), is(true));
+        assertEquals(exchange.getIn().getHeader(InfinispanConstants.RESULT, 
Boolean.class), true);
     }
     
     @Test
@@ -136,7 +243,7 @@ public class InfinispanProducerTest extends 
InfinispanTestSupport {
             }
         });
 
-        assertThat(exchange.getIn().getHeader(InfinispanConstants.RESULT, 
Boolean.class), is(false));
+        assertEquals(exchange.getIn().getHeader(InfinispanConstants.RESULT, 
Boolean.class), false);
     }
     
     @Test
@@ -151,7 +258,45 @@ public class InfinispanProducerTest extends 
InfinispanTestSupport {
             }
         });
 
-        assertThat(exchange.getIn().getHeader(InfinispanConstants.RESULT, 
Boolean.class), is(true));
+        assertEquals(exchange.getIn().getHeader(InfinispanConstants.RESULT, 
Boolean.class), true);
+    }
+    
+    @Test
+    public void publishKeyAndValueWithLifespan() throws Exception {
+        template.send("direct:start", new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE);
+                exchange.getIn().setHeader(InfinispanConstants.VALUE, 
VALUE_ONE);
+                exchange.getIn().setHeader(InfinispanConstants.LIFESPAN_TIME, 
new Long(LIFESPAN_TIME));
+                
exchange.getIn().setHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, 
TimeUnit.SECONDS.toString());
+                exchange.getIn().setHeader(InfinispanConstants.OPERATION, 
InfinispanConstants.PUT);
+            }
+        });
+
+        Object value = currentCache().get(KEY_ONE);
+        assertEquals(value.toString(), VALUE_ONE);
+        
+        Exchange exchange;
+        exchange = template.send("direct:get", new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE);
+            }
+        });
+        String resultGet = 
exchange.getIn().getHeader(InfinispanConstants.RESULT, String.class);
+        assertEquals(VALUE_ONE, resultGet);
+        
+        Thread.sleep(LIFESPAN_TIME * 1000);
+        
+        exchange = template.send("direct:get", new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE);
+            }
+        });
+        resultGet = exchange.getIn().getHeader(InfinispanConstants.RESULT, 
String.class);
+        assertEquals(null, resultGet);
     }
 
     @Test
@@ -167,7 +312,7 @@ public class InfinispanProducerTest extends 
InfinispanTestSupport {
             }
         });
 
-        assertThat(exchange.getIn().getHeader(InfinispanConstants.RESULT, 
String.class), is("existing value"));
+        assertEquals(exchange.getIn().getHeader(InfinispanConstants.RESULT, 
String.class), "existing value");
     }
 
     @Test
@@ -182,7 +327,7 @@ public class InfinispanProducerTest extends 
InfinispanTestSupport {
             }
         });
 
-        assertThat(exchange.getIn().getHeader(InfinispanConstants.RESULT, 
String.class), is(VALUE_ONE));
+        assertEquals(exchange.getIn().getHeader(InfinispanConstants.RESULT, 
String.class), VALUE_ONE);
     }
     
     @Test
@@ -198,7 +343,7 @@ public class InfinispanProducerTest extends 
InfinispanTestSupport {
             }
         });
 
-        assertThat(exchange.getIn().getHeader(InfinispanConstants.RESULT, 
String.class), is(VALUE_ONE));
+        assertEquals(exchange.getIn().getHeader(InfinispanConstants.RESULT, 
String.class), VALUE_ONE);
         assertEquals(currentCache().get(KEY_ONE), VALUE_TWO);
     }
 
@@ -214,16 +359,16 @@ public class InfinispanProducerTest extends 
InfinispanTestSupport {
             }
         });
 
-        assertThat(exchange.getIn().getHeader(InfinispanConstants.RESULT, 
String.class), is(VALUE_ONE));
+        assertEquals(exchange.getIn().getHeader(InfinispanConstants.RESULT, 
String.class), VALUE_ONE);
 
         Object value = currentCache().get(KEY_ONE);
-        assertThat(value, is(nullValue()));
+        assertEquals(value, null);
     }
 
     @Test
     public void clearsAllValues() throws Exception {
         currentCache().put(KEY_ONE, VALUE_ONE);
-        assertThat(currentCache().isEmpty(), is(false));
+        assertEquals(currentCache().isEmpty(), false);
 
         template.send("direct:start", new Processor() {
             @Override
@@ -232,7 +377,7 @@ public class InfinispanProducerTest extends 
InfinispanTestSupport {
             }
         });
 
-        assertThat(currentCache().isEmpty(), is(true));
+        assertEquals(currentCache().isEmpty(), true);
     }
 
     @Test

Reply via email to