CAMEL-8243 Enhance Infinispan listener support

- embedded and remote listeners
- clustered listeners
- custom listeners
- event data


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/c7b1e905
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/c7b1e905
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/c7b1e905

Branch: refs/heads/master
Commit: c7b1e9050614e7c2a39d0f2d9c32f1c7fdc1e157
Parents: 80b0d0b
Author: Tristan Tarrant <ttarr...@redhat.com>
Authored: Thu Feb 18 20:01:38 2016 +0100
Committer: Andrea Cosentino <anco...@gmail.com>
Committed: Sat Feb 20 10:16:23 2016 +0100

----------------------------------------------------------------------
 components/camel-infinispan/pom.xml             |   7 +
 .../InfinispanAsyncEventListener.java           |  29 ---
 .../infinispan/InfinispanConfiguration.java     |  34 +++-
 .../infinispan/InfinispanConstants.java         |   1 +
 .../infinispan/InfinispanConsumer.java          |  50 ++++--
 .../infinispan/InfinispanConsumerHandler.java   |  24 +++
 .../infinispan/InfinispanCustomListener.java    |  38 ++++
 .../infinispan/InfinispanEventListener.java     |  46 +++++
 .../infinispan/InfinispanSyncEventListener.java |  61 -------
 .../InfinispanAsyncClusteredEventListener.java  |  28 +++
 .../InfinispanAsyncLocalEventListener.java      |  29 +++
 .../InfinispanConsumerEmbeddedHandler.java      |  63 +++++++
 .../InfinispanEmbeddedCustomListener.java       |  46 +++++
 .../InfinispanSyncClusteredEventListener.java   |  54 ++++++
 .../InfinispanSyncLocalEventListener.java       |  64 +++++++
 .../remote/InfinispanConsumerRemoteHandler.java |  58 ++++++
 .../remote/InfinispanRemoteCustomListener.java  |  68 +++++++
 .../remote/InfinispanRemoteEventListener.java   |  79 ++++++++
 .../infinispan/InfinispanAsyncConsumerTest.java |  50 ++++++
 .../InfinispanClusterTestSupport.java           | 133 ++++++++++++++
 .../InfinispanClusteredConsumerTest.java        | 106 +++++++++++
 ...InfinispanLocalConsumerCustomListenerIT.java |  63 +++++++
 .../infinispan/InfinispanLocalConsumerTest.java |  81 +++++++++
 .../infinispan/InfinispanProducerTest.java      | 179 ++++++++++---------
 .../infinispan/InfinispanTestSupport.java       |  13 ++
 .../infinispan/MyEmbeddedCustomListener.java    |  44 +++++
 26 files changed, 1251 insertions(+), 197 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/c7b1e905/components/camel-infinispan/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/pom.xml 
b/components/camel-infinispan/pom.xml
index c6e4c07..7dd09bd 100644
--- a/components/camel-infinispan/pom.xml
+++ b/components/camel-infinispan/pom.xml
@@ -58,6 +58,13 @@
             <scope>test</scope>
         </dependency>
         <dependency>
+            <groupId>org.infinispan</groupId>
+            <artifactId>infinispan-core</artifactId>
+            <version>${infinispan-version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-log4j12</artifactId>
             <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/camel/blob/c7b1e905/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanAsyncEventListener.java
----------------------------------------------------------------------
diff --git 
a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanAsyncEventListener.java
 
b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanAsyncEventListener.java
deleted file mode 100644
index 85366a5..0000000
--- 
a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanAsyncEventListener.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.infinispan;
-
-import java.util.Set;
-
-import org.infinispan.notifications.Listener;
-
-@Listener(sync = false)
-public class InfinispanAsyncEventListener extends InfinispanSyncEventListener {
-
-    public InfinispanAsyncEventListener(InfinispanConsumer consumer, 
Set<String> eventTypes) {
-        super(consumer, eventTypes);
-    }
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/c7b1e905/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConfiguration.java
----------------------------------------------------------------------
diff --git 
a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConfiguration.java
 
b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConfiguration.java
index a738ce4..100ff9e 100644
--- 
a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConfiguration.java
+++ 
b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConfiguration.java
@@ -41,6 +41,10 @@ public class InfinispanConfiguration {
     private boolean sync = true;
     @UriParam(label = "consumer", javaType = "java.lang.String")
     private Set<String> eventTypes;
+    @UriParam
+    private InfinispanCustomListener customListener;
+    @UriParam(label = "consumer", defaultValue = "false")
+    private boolean clustered;
 
     public String getCommand() {
         return command;
@@ -66,7 +70,7 @@ public class InfinispanConfiguration {
 
     /**
      * Specifies the cache Container to connect
-     */   
+     */
     public BasicCacheContainer getCacheContainer() {
         return cacheContainer;
     }
@@ -77,7 +81,7 @@ public class InfinispanConfiguration {
 
     /**
      * Specifies the cache name
-     */  
+     */
     public String getCacheName() {
         return cacheName;
     }
@@ -97,6 +101,17 @@ public class InfinispanConfiguration {
         this.sync = sync;
     }
 
+    /**
+     * If true, the listener will be installed for the entire cluster
+     */
+    public boolean isClustered() {
+        return clustered;
+    }
+
+    public void setClustered(boolean clustered) {
+        this.clustered = clustered;
+    }
+
     public Set<String> getEventTypes() {
         return eventTypes;
     }
@@ -122,4 +137,19 @@ public class InfinispanConfiguration {
     public void setEventTypes(String eventTypes) {
         this.eventTypes = new 
HashSet<String>(Arrays.asList(eventTypes.split(",")));
     }
+
+    /**
+     * Returns the custom listener in use, if provided
+     */
+    public InfinispanCustomListener getCustomListener() {
+        return customListener;
+    }
+
+    public void setCustomListener(InfinispanCustomListener customListener) {
+        this.customListener = customListener;
+    }
+
+    public boolean isCustom() {
+        return customListener != null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/c7b1e905/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java
----------------------------------------------------------------------
diff --git 
a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java
 
b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java
index a6ae8a5..681ac65 100644
--- 
a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java
+++ 
b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java
@@ -47,4 +47,5 @@ public interface InfinispanConstants {
     String MAX_IDLE_TIME = "CamelInfinispanMaxIdleTime";
     String MAX_IDLE_TIME_UNIT = "CamelInfinispanMaxIdleTimeUnit";
     String IGNORE_RETURN_VALUES = "CamelInfinispanIgnoreReturnValues";
+    String EVENT_DATA = "CamelInfinispanEventData";
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/c7b1e905/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConsumer.java
 
b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConsumer.java
index cb5161a..90a5e3e 100644
--- 
a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConsumer.java
+++ 
b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConsumer.java
@@ -18,8 +18,12 @@ package org.apache.camel.component.infinispan;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
+import 
org.apache.camel.component.infinispan.embedded.InfinispanConsumerEmbeddedHandler;
+import 
org.apache.camel.component.infinispan.remote.InfinispanConsumerRemoteHandler;
 import org.apache.camel.impl.DefaultConsumer;
 import org.infinispan.Cache;
+import org.infinispan.commons.api.BasicCache;
+import org.infinispan.commons.api.BasicCacheContainer;
 import org.infinispan.manager.EmbeddedCacheManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -27,25 +31,28 @@ import org.slf4j.LoggerFactory;
 public class InfinispanConsumer extends DefaultConsumer {
     private static final transient Logger LOGGER = 
LoggerFactory.getLogger(InfinispanProducer.class);
     private final InfinispanConfiguration configuration;
-    private final InfinispanSyncEventListener listener;
+    private InfinispanEventListener listener;
     private EmbeddedCacheManager cacheManager;
+    private BasicCache<Object, Object> cache;
 
     public InfinispanConsumer(InfinispanEndpoint endpoint, Processor 
processor, InfinispanConfiguration configuration) {
         super(endpoint, processor);
         this.configuration = configuration;
-        if (configuration.isSync()) {
-            listener = new InfinispanSyncEventListener(this, 
configuration.getEventTypes());
-        } else {
-            listener = new InfinispanAsyncEventListener(this, 
configuration.getEventTypes());
-        }
     }
 
     public void processEvent(String eventType, boolean isPre, String 
cacheName, Object key) {
+        processEvent(eventType, isPre, cacheName, key, null);
+    }
+
+    public void processEvent(String eventType, boolean isPre, String 
cacheName, Object key, Object eventData) {
         Exchange exchange = getEndpoint().createExchange();
         exchange.getOut().setHeader(InfinispanConstants.EVENT_TYPE, eventType);
         exchange.getOut().setHeader(InfinispanConstants.IS_PRE, isPre);
         exchange.getOut().setHeader(InfinispanConstants.CACHE_NAME, cacheName);
         exchange.getOut().setHeader(InfinispanConstants.KEY, key);
+        if (eventData != null) {
+            exchange.getOut().setHeader(InfinispanConstants.EVENT_DATA, 
eventData);
+        }
 
         try {
             getProcessor().process(exchange);
@@ -56,19 +63,18 @@ public class InfinispanConsumer extends DefaultConsumer {
 
     @Override
     protected void doStart() throws Exception {
-        if (configuration.getCacheContainer() instanceof EmbeddedCacheManager) 
{
-            cacheManager = (EmbeddedCacheManager) 
configuration.getCacheContainer();
-            Cache<Object, Object> cache;
-            if (configuration.getCacheName() != null) {
-                cache = cacheManager.getCache(configuration.getCacheName());
-            } else {
-                cache = cacheManager.getCache();
-            }
-            cache.addListener(listener);
+        BasicCacheContainer cacheContainer = configuration.getCacheContainer();
+        String cacheName = configuration.getCacheName();
+        cache = cacheName == null ? cacheContainer.getCache() : 
cacheContainer.getCache(cacheName);
+        if (InfinispanUtil.isEmbedded(cacheContainer)) {
+            listener = InfinispanConsumerEmbeddedHandler.INSTANCE.start(this);
+        } else if (InfinispanUtil.isRemote(cacheContainer)) {
+            listener = InfinispanConsumerRemoteHandler.INSTANCE.start(this);
         } else {
-            throw new UnsupportedOperationException("Consumer not support for 
CacheContainer: " + configuration.getCacheContainer());
+            throw new UnsupportedOperationException("Consumer not support for 
CacheContainer: " + cacheContainer);
         }
         super.doStart();
+
     }
 
     @Override
@@ -78,4 +84,16 @@ public class InfinispanConsumer extends DefaultConsumer {
         }
         super.doStop();
     }
+
+    public BasicCache<Object, Object> getCache() {
+        return cache;
+    }
+
+    public InfinispanEventListener getListener() {
+        return listener;
+    }
+
+    public InfinispanConfiguration getConfiguration() {
+        return configuration;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/c7b1e905/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConsumerHandler.java
----------------------------------------------------------------------
diff --git 
a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConsumerHandler.java
 
b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConsumerHandler.java
new file mode 100644
index 0000000..4f83ad9
--- /dev/null
+++ 
b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConsumerHandler.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan;
+
+
+public interface InfinispanConsumerHandler {
+    InfinispanEventListener start(InfinispanConsumer consumer);
+
+    void stop(InfinispanConsumer consumer);
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/c7b1e905/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanCustomListener.java
----------------------------------------------------------------------
diff --git 
a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanCustomListener.java
 
b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanCustomListener.java
new file mode 100644
index 0000000..46c932f
--- /dev/null
+++ 
b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanCustomListener.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan;
+
+import java.util.Set;
+
+/**
+ * This class is supposed to be extended by users and annotated with @Listener 
or @ClientListener
+ * and passed to the consumer endpoint through the 'customListener' parameter.
+ */
+public abstract class InfinispanCustomListener extends InfinispanEventListener 
{
+
+    public InfinispanCustomListener(InfinispanConsumer infinispanConsumer, 
Set<String> eventTypes) {
+        super(infinispanConsumer, eventTypes);
+    }
+
+    public InfinispanConsumer getInfinispanConsumer() {
+        return infinispanConsumer;
+    }
+
+    public void setInfinispanConsumer(InfinispanConsumer infinispanConsumer) {
+        this.infinispanConsumer = infinispanConsumer;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/c7b1e905/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanEventListener.java
----------------------------------------------------------------------
diff --git 
a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanEventListener.java
 
b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanEventListener.java
new file mode 100644
index 0000000..f40bb57
--- /dev/null
+++ 
b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanEventListener.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan;
+
+import java.util.Set;
+
+public abstract class InfinispanEventListener {
+    protected InfinispanConsumer infinispanConsumer;
+    protected Set<String> eventTypes;
+    protected String cacheName;
+
+    public InfinispanEventListener(InfinispanConsumer infinispanConsumer, 
Set<String> eventTypes) {
+        this.infinispanConsumer = infinispanConsumer;
+        this.eventTypes = eventTypes;
+    }
+
+    public void setInfinispanConsumer(InfinispanConsumer infinispanConsumer) {
+        this.infinispanConsumer = infinispanConsumer;
+    }
+
+    public void setEventTypes(Set<String> eventTypes) {
+        this.eventTypes = eventTypes;
+    }
+
+    public void setCacheName(String cacheName) {
+        this.cacheName = cacheName;
+    }
+
+    protected boolean isAccepted(String eventType) {
+        return eventTypes == null || eventTypes.isEmpty() || 
eventTypes.contains(eventType);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/c7b1e905/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanSyncEventListener.java
----------------------------------------------------------------------
diff --git 
a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanSyncEventListener.java
 
b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanSyncEventListener.java
deleted file mode 100644
index 2ae6f00..0000000
--- 
a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanSyncEventListener.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.infinispan;
-
-import java.util.Set;
-
-import org.infinispan.notifications.Listener;
-import 
org.infinispan.notifications.cachelistener.annotation.CacheEntryActivated;
-import org.infinispan.notifications.cachelistener.annotation.CacheEntryCreated;
-import 
org.infinispan.notifications.cachelistener.annotation.CacheEntryInvalidated;
-import org.infinispan.notifications.cachelistener.annotation.CacheEntryLoaded;
-import 
org.infinispan.notifications.cachelistener.annotation.CacheEntryModified;
-import 
org.infinispan.notifications.cachelistener.annotation.CacheEntryPassivated;
-import org.infinispan.notifications.cachelistener.annotation.CacheEntryRemoved;
-import org.infinispan.notifications.cachelistener.annotation.CacheEntryVisited;
-import org.infinispan.notifications.cachelistener.event.CacheEntryEvent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@Listener(sync = true)
-public class InfinispanSyncEventListener {
-    private final transient Logger logger = 
LoggerFactory.getLogger(this.getClass());
-    private final InfinispanConsumer infinispanConsumer;
-    private final Set<String> eventTypes;
-
-    public InfinispanSyncEventListener(InfinispanConsumer infinispanConsumer, 
Set<String> eventTypes) {
-        this.infinispanConsumer = infinispanConsumer;
-        this.eventTypes = eventTypes;
-    }
-
-    @CacheEntryActivated
-    @CacheEntryCreated
-    @CacheEntryInvalidated
-    @CacheEntryLoaded
-    @CacheEntryModified
-    @CacheEntryPassivated
-    @CacheEntryRemoved
-    @CacheEntryVisited
-    public void processEvent(CacheEntryEvent<Object, Object> event) {
-        logger.trace("Received CacheEntryEvent [{}]", event);
-
-        if (eventTypes == null || eventTypes.isEmpty() || 
eventTypes.contains(event.getType().toString())) {
-            infinispanConsumer.processEvent(event.getType().toString(), 
event.isPre(), event.getCache().getName(), event.getKey());
-        }
-    }
-}
-

http://git-wip-us.apache.org/repos/asf/camel/blob/c7b1e905/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanAsyncClusteredEventListener.java
----------------------------------------------------------------------
diff --git 
a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanAsyncClusteredEventListener.java
 
b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanAsyncClusteredEventListener.java
new file mode 100644
index 0000000..8ef544f
--- /dev/null
+++ 
b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanAsyncClusteredEventListener.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan.embedded;
+
+import java.util.Set;
+import org.apache.camel.component.infinispan.InfinispanConsumer;
+import org.infinispan.notifications.Listener;
+
+@Listener(clustered = true, sync = false)
+public class InfinispanAsyncClusteredEventListener extends 
InfinispanSyncLocalEventListener {
+    public InfinispanAsyncClusteredEventListener(InfinispanConsumer consumer, 
Set<String> eventTypes) {
+        super(consumer, eventTypes);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/c7b1e905/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanAsyncLocalEventListener.java
----------------------------------------------------------------------
diff --git 
a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanAsyncLocalEventListener.java
 
b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanAsyncLocalEventListener.java
new file mode 100644
index 0000000..4f50950
--- /dev/null
+++ 
b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanAsyncLocalEventListener.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan.embedded;
+
+import java.util.Set;
+import org.apache.camel.component.infinispan.InfinispanConsumer;
+import org.infinispan.notifications.Listener;
+
+@Listener(clustered = false, sync = false)
+public class InfinispanAsyncLocalEventListener extends 
InfinispanSyncLocalEventListener {
+
+    public InfinispanAsyncLocalEventListener(InfinispanConsumer consumer, 
Set<String> eventTypes) {
+        super(consumer, eventTypes);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/c7b1e905/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanConsumerEmbeddedHandler.java
----------------------------------------------------------------------
diff --git 
a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanConsumerEmbeddedHandler.java
 
b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanConsumerEmbeddedHandler.java
new file mode 100644
index 0000000..d7844e6
--- /dev/null
+++ 
b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanConsumerEmbeddedHandler.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan.embedded;
+
+import org.apache.camel.component.infinispan.InfinispanConfiguration;
+import org.apache.camel.component.infinispan.InfinispanConsumer;
+import org.apache.camel.component.infinispan.InfinispanConsumerHandler;
+import org.apache.camel.component.infinispan.InfinispanCustomListener;
+import org.apache.camel.component.infinispan.InfinispanEventListener;
+import org.infinispan.Cache;
+
+public final class InfinispanConsumerEmbeddedHandler implements 
InfinispanConsumerHandler {
+    public static final InfinispanConsumerHandler INSTANCE = new 
InfinispanConsumerEmbeddedHandler();
+
+    private InfinispanConsumerEmbeddedHandler() {
+    }
+
+    @Override
+    public InfinispanEventListener start(InfinispanConsumer consumer) {
+        Cache<?, ?> embeddedCache = (Cache<?, ?>) consumer.getCache();
+        InfinispanConfiguration configuration = consumer.getConfiguration();
+        InfinispanEventListener listener;
+        if (configuration.isCustom()) {
+            listener = configuration.getCustomListener();
+            
((InfinispanCustomListener)listener).setInfinispanConsumer(consumer);
+        } else if (configuration.isClustered()) {
+            if (configuration.isSync()) {
+                listener = new InfinispanSyncClusteredEventListener(consumer, 
configuration.getEventTypes());
+            } else {
+                listener = new InfinispanAsyncClusteredEventListener(consumer, 
configuration.getEventTypes());
+            }
+        } else {
+            if (configuration.isSync()) {
+                listener = new InfinispanSyncLocalEventListener(consumer, 
configuration.getEventTypes());
+            } else {
+                listener = new InfinispanAsyncLocalEventListener(consumer, 
configuration.getEventTypes());
+            }
+        }
+        embeddedCache.addListener(listener);
+        return listener;
+    }
+
+    @Override
+    public void stop(InfinispanConsumer consumer) {
+        Cache<?, ?> embeddedCache = (Cache<?, ?>) consumer.getCache();
+        embeddedCache.removeListener(consumer.getListener());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/c7b1e905/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanEmbeddedCustomListener.java
----------------------------------------------------------------------
diff --git 
a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanEmbeddedCustomListener.java
 
b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanEmbeddedCustomListener.java
new file mode 100644
index 0000000..040f406
--- /dev/null
+++ 
b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanEmbeddedCustomListener.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan.embedded;
+
+import org.apache.camel.component.infinispan.InfinispanCustomListener;
+import org.infinispan.notifications.cachelistener.annotation.CacheEntryCreated;
+import org.infinispan.notifications.cachelistener.annotation.CacheEntryExpired;
+import 
org.infinispan.notifications.cachelistener.annotation.CacheEntryModified;
+import org.infinispan.notifications.cachelistener.annotation.CacheEntryRemoved;
+import org.infinispan.notifications.cachelistener.event.CacheEntryEvent;
+
+/**
+ * This class is supposed to be extended by users and annotated with @Listener
+ * and passed to the consumer endpoint through the 'customListener' parameter.
+ */
+public abstract class InfinispanEmbeddedCustomListener extends 
InfinispanCustomListener {
+
+    public InfinispanEmbeddedCustomListener() {
+        super(null, null);
+    }
+
+    @CacheEntryCreated
+    @CacheEntryModified
+    @CacheEntryRemoved
+    @CacheEntryExpired
+    public void processEvent(CacheEntryEvent<Object, Object> event) {
+        if (isAccepted(event.getType().toString())) {
+            infinispanConsumer.processEvent(event.getType().toString(), 
event.isPre(), event.getCache().getName(), event.getKey());
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/c7b1e905/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanSyncClusteredEventListener.java
----------------------------------------------------------------------
diff --git 
a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanSyncClusteredEventListener.java
 
b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanSyncClusteredEventListener.java
new file mode 100644
index 0000000..bcf6957
--- /dev/null
+++ 
b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanSyncClusteredEventListener.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan.embedded;
+
+import java.util.Set;
+import org.apache.camel.component.infinispan.InfinispanConsumer;
+import org.apache.camel.component.infinispan.InfinispanEventListener;
+import org.infinispan.notifications.Listener;
+import org.infinispan.notifications.cachelistener.annotation.CacheEntryCreated;
+import org.infinispan.notifications.cachelistener.annotation.CacheEntryExpired;
+import 
org.infinispan.notifications.cachelistener.annotation.CacheEntryModified;
+import org.infinispan.notifications.cachelistener.annotation.CacheEntryRemoved;
+import org.infinispan.notifications.cachelistener.event.CacheEntryEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Listener(clustered = true, sync = true)
+public class InfinispanSyncClusteredEventListener extends 
InfinispanEventListener {
+    /*clustered listeners only listen for post events*/
+    private static final boolean IS_PRE = false;
+
+    private final transient Logger logger = 
LoggerFactory.getLogger(this.getClass());
+
+    public InfinispanSyncClusteredEventListener(InfinispanConsumer 
infinispanConsumer, Set<String> eventTypes) {
+        super(infinispanConsumer, eventTypes);
+    }
+
+    @CacheEntryCreated
+    @CacheEntryModified
+    @CacheEntryRemoved
+    @CacheEntryExpired
+    public void processEvent(CacheEntryEvent<Object, Object> event) {
+        logger.trace("Received CacheEntryEvent [{}]", event);
+
+        if (isAccepted(event.getType().toString())) {
+            infinispanConsumer.processEvent(event.getType().toString(), 
IS_PRE, event.getCache().getName(), event.getKey());
+        }
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/camel/blob/c7b1e905/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanSyncLocalEventListener.java
----------------------------------------------------------------------
diff --git 
a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanSyncLocalEventListener.java
 
b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanSyncLocalEventListener.java
new file mode 100644
index 0000000..1cf9325
--- /dev/null
+++ 
b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanSyncLocalEventListener.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan.embedded;
+
+import java.util.Set;
+import org.apache.camel.component.infinispan.InfinispanConsumer;
+import org.apache.camel.component.infinispan.InfinispanEventListener;
+import org.infinispan.notifications.Listener;
+import 
org.infinispan.notifications.cachelistener.annotation.CacheEntryActivated;
+import org.infinispan.notifications.cachelistener.annotation.CacheEntryCreated;
+import org.infinispan.notifications.cachelistener.annotation.CacheEntryExpired;
+import 
org.infinispan.notifications.cachelistener.annotation.CacheEntryInvalidated;
+import org.infinispan.notifications.cachelistener.annotation.CacheEntryLoaded;
+import 
org.infinispan.notifications.cachelistener.annotation.CacheEntryModified;
+import 
org.infinispan.notifications.cachelistener.annotation.CacheEntryPassivated;
+import org.infinispan.notifications.cachelistener.annotation.CacheEntryRemoved;
+import org.infinispan.notifications.cachelistener.annotation.CacheEntryVisited;
+import org.infinispan.notifications.cachelistener.event.CacheEntryEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Listener(clustered = false, sync = true)
+public class InfinispanSyncLocalEventListener extends InfinispanEventListener {
+    private final transient Logger logger = 
LoggerFactory.getLogger(this.getClass());
+
+    public InfinispanSyncLocalEventListener(InfinispanConsumer 
infinispanConsumer, Set<String> eventTypes) {
+        super(infinispanConsumer, eventTypes);
+    }
+
+    @CacheEntryActivated
+    @CacheEntryCreated
+    @CacheEntryInvalidated
+    @CacheEntryLoaded
+    @CacheEntryModified
+    @CacheEntryPassivated
+    @CacheEntryRemoved
+    @CacheEntryVisited
+    @CacheEntryExpired
+    public void processEvent(CacheEntryEvent<Object, Object> event) {
+        logger.trace("Received CacheEntryEvent [{}]", event);
+
+        dispatch(event.getType().toString(), event.isPre(), 
event.getCache().getName(), event.getKey());
+    }
+
+    private void dispatch(String eventType, boolean isPre, String cacheName, 
Object key) {
+        if (isAccepted(eventType)) {
+            infinispanConsumer.processEvent(eventType, isPre, cacheName, key, 
null);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/c7b1e905/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanConsumerRemoteHandler.java
----------------------------------------------------------------------
diff --git 
a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanConsumerRemoteHandler.java
 
b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanConsumerRemoteHandler.java
new file mode 100644
index 0000000..48ec911
--- /dev/null
+++ 
b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanConsumerRemoteHandler.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan.remote;
+
+import org.apache.camel.component.infinispan.InfinispanConfiguration;
+import org.apache.camel.component.infinispan.InfinispanConsumer;
+import org.apache.camel.component.infinispan.InfinispanConsumerHandler;
+import org.apache.camel.component.infinispan.InfinispanCustomListener;
+import org.apache.camel.component.infinispan.InfinispanEventListener;
+import org.infinispan.client.hotrod.RemoteCache;
+
+public final class InfinispanConsumerRemoteHandler implements 
InfinispanConsumerHandler {
+    public static final InfinispanConsumerHandler INSTANCE = new 
InfinispanConsumerRemoteHandler();
+
+    private InfinispanConsumerRemoteHandler() {
+    }
+
+    @Override
+    public InfinispanEventListener start(InfinispanConsumer consumer) {
+        if (consumer.getConfiguration().isSync()) {
+            throw new UnsupportedOperationException("Sync listeners not 
supported for remote caches.");
+        }
+        RemoteCache<?, ?> remoteCache = (RemoteCache<?, ?>) 
consumer.getCache();
+        InfinispanConfiguration configuration = consumer.getConfiguration();
+        InfinispanEventListener listener;
+        if (configuration.isCustom()) {
+            listener = configuration.getCustomListener();
+            
((InfinispanCustomListener)listener).setInfinispanConsumer(consumer);
+        } else {
+            listener = new InfinispanRemoteEventListener(consumer, 
configuration.getEventTypes());
+        }
+        remoteCache.addClientListener(listener);
+        listener.setCacheName(remoteCache.getName());
+        return listener;
+
+    }
+
+    @Override
+    public void stop(InfinispanConsumer consumer) {
+        RemoteCache<?, ?> remoteCache = (RemoteCache<?, ?>) 
consumer.getCache();
+        remoteCache.removeClientListener(consumer.getListener());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/c7b1e905/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanRemoteCustomListener.java
----------------------------------------------------------------------
diff --git 
a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanRemoteCustomListener.java
 
b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanRemoteCustomListener.java
new file mode 100644
index 0000000..7ab0447
--- /dev/null
+++ 
b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanRemoteCustomListener.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan.remote;
+
+import org.apache.camel.component.infinispan.InfinispanCustomListener;
+import org.infinispan.client.hotrod.annotation.ClientCacheEntryCreated;
+import org.infinispan.client.hotrod.annotation.ClientCacheEntryExpired;
+import org.infinispan.client.hotrod.annotation.ClientCacheEntryModified;
+import org.infinispan.client.hotrod.annotation.ClientCacheEntryRemoved;
+import org.infinispan.client.hotrod.annotation.ClientCacheFailover;
+import org.infinispan.client.hotrod.event.ClientCacheEntryCreatedEvent;
+import org.infinispan.client.hotrod.event.ClientCacheEntryCustomEvent;
+import org.infinispan.client.hotrod.event.ClientCacheEntryModifiedEvent;
+import org.infinispan.client.hotrod.event.ClientCacheEntryRemovedEvent;
+import org.infinispan.client.hotrod.event.ClientEvent;
+
+/**
+ * This class is supposed to be extended by users and annotated with 
@ClientListener
+ * and passed to the consumer endpoint through the 'customListener' parameter.
+ */
+public abstract class InfinispanRemoteCustomListener extends 
InfinispanCustomListener {
+    public InfinispanRemoteCustomListener() {
+        super(null, null);
+    }
+
+    @ClientCacheEntryCreated
+    @ClientCacheEntryModified
+    @ClientCacheEntryRemoved
+    @ClientCacheEntryExpired
+    @ClientCacheFailover
+    public void processClientEvent(ClientEvent event) {
+        if (isAccepted(event.getType().toString())) {
+            infinispanConsumer.processEvent(event.getType().toString(), false, 
cacheName, getKey(event), getEventData(event));
+        }
+    }
+
+    private Object getKey(ClientEvent event) {
+        if (event instanceof ClientCacheEntryCreatedEvent) {
+            return ((ClientCacheEntryCreatedEvent<?>) event).getKey();
+        } else if (event instanceof ClientCacheEntryModifiedEvent) {
+            return ((ClientCacheEntryModifiedEvent<?>) event).getKey();
+        } else if (event instanceof ClientCacheEntryRemovedEvent) {
+            return ((ClientCacheEntryRemovedEvent<?>) event).getKey();
+        }
+        return null;
+    }
+
+    private Object getEventData(ClientEvent e) {
+        if (e instanceof ClientCacheEntryCustomEvent) {
+            return ((ClientCacheEntryCustomEvent<?>) e).getEventData();
+        }
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/c7b1e905/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanRemoteEventListener.java
----------------------------------------------------------------------
diff --git 
a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanRemoteEventListener.java
 
b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanRemoteEventListener.java
new file mode 100644
index 0000000..40807eb
--- /dev/null
+++ 
b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanRemoteEventListener.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan.remote;
+
+import java.util.Set;
+import org.apache.camel.component.infinispan.InfinispanConsumer;
+import org.apache.camel.component.infinispan.InfinispanEventListener;
+import org.infinispan.client.hotrod.annotation.ClientCacheEntryCreated;
+import org.infinispan.client.hotrod.annotation.ClientCacheEntryExpired;
+import org.infinispan.client.hotrod.annotation.ClientCacheEntryModified;
+import org.infinispan.client.hotrod.annotation.ClientCacheEntryRemoved;
+import org.infinispan.client.hotrod.annotation.ClientCacheFailover;
+import org.infinispan.client.hotrod.annotation.ClientListener;
+import org.infinispan.client.hotrod.event.ClientCacheEntryCreatedEvent;
+import org.infinispan.client.hotrod.event.ClientCacheEntryExpiredEvent;
+import org.infinispan.client.hotrod.event.ClientCacheEntryModifiedEvent;
+import org.infinispan.client.hotrod.event.ClientCacheEntryRemovedEvent;
+import org.infinispan.client.hotrod.event.ClientCacheFailoverEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@ClientListener
+public class InfinispanRemoteEventListener extends InfinispanEventListener {
+    private final transient Logger logger = 
LoggerFactory.getLogger(this.getClass());
+
+    public InfinispanRemoteEventListener(InfinispanConsumer 
infinispanConsumer, Set<String> eventTypes) {
+        super(infinispanConsumer, eventTypes);
+    }
+
+    @ClientCacheEntryCreated
+    public void processEvent(ClientCacheEntryCreatedEvent<Object> event) {
+        logger.trace("Received ClientEvent [{}]", event);
+        dispatch(event.getType().toString(), false, cacheName, event.getKey());
+    }
+
+    @ClientCacheEntryModified
+    public void processEvent(ClientCacheEntryModifiedEvent<Object> event) {
+        logger.trace("Received ClientEvent [{}]", event);
+        dispatch(event.getType().toString(), false, cacheName, event.getKey());
+    }
+
+    @ClientCacheEntryRemoved
+    public void processEvent(ClientCacheEntryRemovedEvent<Object> event) {
+        logger.trace("Received ClientEvent [{}]", event);
+        dispatch(event.getType().toString(), false, cacheName, event.getKey());
+    }
+
+    @ClientCacheFailover
+    public void processEvent(ClientCacheFailoverEvent event) {
+        logger.trace("Received ClientEvent [{}]", event);
+        dispatch(event.getType().toString(), false, cacheName, null);
+    }
+
+    @ClientCacheEntryExpired
+    public void processEvent(ClientCacheEntryExpiredEvent<Object> event) {
+        logger.trace("Received ClientEvent [{}]", event);
+        dispatch(event.getType().toString(), false, cacheName, event.getKey());
+    }
+
+    private void dispatch(String eventType, boolean isPre, String cacheName, 
Object key) {
+        if (isAccepted(eventType)) {
+            infinispanConsumer.processEvent(eventType, isPre, cacheName, key, 
null);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/c7b1e905/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanAsyncConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanAsyncConsumerTest.java
 
b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanAsyncConsumerTest.java
new file mode 100644
index 0000000..dc44483
--- /dev/null
+++ 
b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanAsyncConsumerTest.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
+
+public class InfinispanAsyncConsumerTest extends InfinispanTestSupport {
+
+    @EndpointInject(uri = "mock:result")
+    private MockEndpoint mockResult;
+
+    @Test
+    public void consumerReceivedPreAndPostEntryCreatedEventNotifications() 
throws Exception {
+        mockResult.expectedMessageCount(2);
+        mockResult.setResultMinimumWaitTime(900);
+
+        currentCache().put(KEY_ONE, VALUE_ONE);
+        mockResult.assertIsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                
from("infinispan://localhost?cacheContainer=#cacheContainer&sync=false&eventTypes=CACHE_ENTRY_CREATED")
+                        .delayer(500)
+                        .to("mock:result");
+            }
+        };
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/camel/blob/c7b1e905/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanClusterTestSupport.java
----------------------------------------------------------------------
diff --git 
a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanClusterTestSupport.java
 
b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanClusterTestSupport.java
new file mode 100644
index 0000000..b47b1db
--- /dev/null
+++ 
b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanClusterTestSupport.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan;
+
+import java.util.List;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.infinispan.Cache;
+import org.infinispan.commons.api.BasicCacheContainer;
+import org.infinispan.configuration.cache.CacheMode;
+import org.infinispan.configuration.cache.ConfigurationBuilder;
+import org.infinispan.manager.EmbeddedCacheManager;
+import org.infinispan.test.MultipleCacheManagersTest;
+import org.infinispan.test.TestingUtil;
+import org.infinispan.transaction.TransactionMode;
+import org.infinispan.util.ControlledTimeService;
+import org.infinispan.util.TimeService;
+import org.junit.Before;
+
+public class InfinispanClusterTestSupport extends CamelTestSupport {
+
+    protected static final String KEY_ONE = "keyOne";
+    protected static final String VALUE_ONE = "valueOne";
+
+    protected List<EmbeddedCacheManager> clusteredCacheContainers;
+
+    protected ControlledTimeService ts0;
+    protected ControlledTimeService ts1;
+
+    protected static class ClusteredCacheSupport extends 
MultipleCacheManagersTest {
+
+        protected ConfigurationBuilder builderUsed;
+        protected final boolean tx;
+        protected final CacheMode cacheMode;
+        protected String cacheName;
+        protected final int clusterSize;
+
+        public ClusteredCacheSupport(CacheMode cacheMode, boolean tx, int 
clusterSize) {
+            this.tx = tx;
+            this.cacheMode = cacheMode;
+            this.clusterSize = clusterSize;
+        }
+
+        public ClusteredCacheSupport(CacheMode cacheMode, String cacheName, 
boolean tx, int clusterSize) {
+            this.tx = tx;
+            this.cacheMode = cacheMode;
+            this.cacheName = cacheName;
+            this.clusterSize = clusterSize;
+        }
+
+        @Override
+        public void createCacheManagers() throws Throwable {
+            builderUsed = new ConfigurationBuilder();
+            builderUsed.clustering().cacheMode(cacheMode);
+            if (tx) {
+                
builderUsed.transaction().transactionMode(TransactionMode.TRANSACTIONAL);
+            }
+            if (cacheMode.isDistributed()) {
+                builderUsed.clustering().hash().numOwners(1);
+            }
+            if (cacheName != null) {
+                createClusteredCaches(clusterSize, cacheName, builderUsed);
+            } else {
+                createClusteredCaches(clusterSize, builderUsed);
+            }
+        }
+    }
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        ClusteredCacheSupport cluster = new 
ClusteredCacheSupport(CacheMode.DIST_SYNC, false, 2);
+        try {
+            cluster.createCacheManagers();
+            clusteredCacheContainers = cluster.getCacheManagers();
+        } catch (Throwable ex) {
+            throw new Exception(ex);
+        }
+        super.setUp();
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        for (BasicCacheContainer container: clusteredCacheContainers) {
+            container.stop();
+        }
+        super.tearDown();
+    }
+
+    @Override
+    protected JndiRegistry createRegistry() throws Exception {
+        JndiRegistry registry = super.createRegistry();
+        registry.bind("cacheContainer", clusteredCacheContainers.get(0));
+        return registry;
+    }
+
+    protected Cache<Object, Object> defaultCache() {
+        return clusteredCacheContainers.get(0).getCache();
+    }
+
+    protected Cache<Object, Object> defaultCache(int index) {
+        return clusteredCacheContainers.get(index).getCache();
+    }
+
+    protected Cache<Object, Object> namedCache(String name) {
+        return clusteredCacheContainers.get(0).getCache(name);
+    }
+
+    protected Cache<Object, Object> namedCache(int index, String name) {
+        return clusteredCacheContainers.get(index).getCache(name);
+    }
+
+    protected void injectTimeService() {
+        ts0 = new ControlledTimeService(0);
+        TestingUtil.replaceComponent(clusteredCacheContainers.get(0), 
TimeService.class, ts0, true);
+        ts1 = new ControlledTimeService(0);
+        TestingUtil.replaceComponent(clusteredCacheContainers.get(1), 
TimeService.class, ts1, true);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/c7b1e905/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanClusteredConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanClusteredConsumerTest.java
 
b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanClusteredConsumerTest.java
new file mode 100644
index 0000000..ab02821
--- /dev/null
+++ 
b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanClusteredConsumerTest.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.infinispan.distribution.MagicKey;
+import org.infinispan.test.fwk.TestResourceTracker;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class InfinispanClusteredConsumerTest extends 
InfinispanClusterTestSupport {
+
+    private static final long WAIT_TIMEOUT = 5000;
+
+    @EndpointInject(uri = "mock:resultCreated")
+    private MockEndpoint mockResultCreatedEvents;
+
+    @EndpointInject(uri = "mock:resultExpired")
+    private MockEndpoint mockResultExpiredEvents;
+
+    @BeforeClass
+    public static void beforeClass() {
+        
TestResourceTracker.testStarted(InfinispanClusteredConsumerTest.class.getName());
+    }
+
+    @AfterClass
+    public static void afterClass() {
+        
TestResourceTracker.testFinished(InfinispanClusteredConsumerTest.class.getName());
+    }
+
+    @Test
+    public void consumerReceivedPostEntryCreatedEventNotifications() throws 
Exception {
+        MagicKey key = new MagicKey(defaultCache(1));
+
+        mockResultCreatedEvents.expectedMessageCount(1);
+
+        
mockResultCreatedEvents.message(0).outHeader(InfinispanConstants.EVENT_TYPE).isEqualTo("CACHE_ENTRY_CREATED");
+        
mockResultCreatedEvents.message(0).outHeader(InfinispanConstants.IS_PRE).isEqualTo(false);
+        
mockResultCreatedEvents.message(0).outHeader(InfinispanConstants.CACHE_NAME).isNotNull();
+        
mockResultCreatedEvents.message(0).outHeader(InfinispanConstants.KEY).isEqualTo(key.toString());
+
+        defaultCache(1).put(key, "value");
+        mockResultCreatedEvents.assertIsSatisfied();
+    }
+
+    @Test
+    public void consumerReceivedExpirationEventNotifications() throws 
Exception {
+        MagicKey key = new MagicKey(defaultCache(1));
+
+        mockResultCreatedEvents.expectedMessageCount(1);
+        mockResultExpiredEvents.expectedMessageCount(1);
+
+        
mockResultCreatedEvents.message(0).outHeader(InfinispanConstants.EVENT_TYPE).isEqualTo("CACHE_ENTRY_CREATED");
+        
mockResultCreatedEvents.message(0).outHeader(InfinispanConstants.IS_PRE).isEqualTo(false);
+        
mockResultCreatedEvents.message(0).outHeader(InfinispanConstants.CACHE_NAME).isNotNull();
+        
mockResultCreatedEvents.message(0).outHeader(InfinispanConstants.KEY).isEqualTo(key.toString());
+
+        
mockResultExpiredEvents.message(0).outHeader(InfinispanConstants.EVENT_TYPE).isEqualTo("CACHE_ENTRY_EXPIRED");
+        
mockResultExpiredEvents.message(0).outHeader(InfinispanConstants.IS_PRE).isEqualTo(false);
+        
mockResultExpiredEvents.message(0).outHeader(InfinispanConstants.CACHE_NAME).isNotNull();
+        
mockResultExpiredEvents.message(0).outHeader(InfinispanConstants.KEY).isEqualTo(key.toString());
+
+        injectTimeService();
+
+        defaultCache(1).put(key, "value", 1000, TimeUnit.MILLISECONDS);
+
+        ts0.advance(1001);
+        ts1.advance(1001);
+
+        assertNull(defaultCache(1).get(key));
+        mockResultCreatedEvents.assertIsSatisfied();
+        mockResultExpiredEvents.assertIsSatisfied(WAIT_TIMEOUT);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                
from("infinispan://localhost?cacheContainer=#cacheContainer&clustered=true&eventTypes=CACHE_ENTRY_CREATED")
+                        .to("mock:resultCreated");
+                
from("infinispan://localhost?cacheContainer=#cacheContainer&clustered=true&eventTypes=CACHE_ENTRY_EXPIRED")
+                        .to("mock:resultExpired");
+            }
+        };
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/camel/blob/c7b1e905/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanLocalConsumerCustomListenerIT.java
----------------------------------------------------------------------
diff --git 
a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanLocalConsumerCustomListenerIT.java
 
b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanLocalConsumerCustomListenerIT.java
new file mode 100644
index 0000000..40015e9
--- /dev/null
+++ 
b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanLocalConsumerCustomListenerIT.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.JndiRegistry;
+import org.junit.Test;
+
+public class InfinispanLocalConsumerCustomListenerIT extends 
InfinispanTestSupport {
+
+    private static final String CUSTOM_CACHE_NAME = "customCacheName";
+
+    @EndpointInject(uri = "mock:result")
+    private MockEndpoint mockResult;
+
+    @Override
+    protected JndiRegistry createRegistry() throws Exception {
+        JndiRegistry registry = super.createRegistry();
+        registry.bind("myCustomListener", new 
MyEmbeddedCustomListener(CUSTOM_CACHE_NAME));
+        return registry;
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                from("infinispan://?cacheContainer=#cacheContainer&cacheName=" 
+ CUSTOM_CACHE_NAME + "&customListener=#myCustomListener")
+                        .to("mock:result");
+            }
+        };
+    }
+
+    @Test
+    public void createEventConsumed() throws InterruptedException {
+        //One for the existing entry and one for the new entry.
+        mockResult.expectedMessageCount(2);
+
+        basicCacheContainer.getCache(CUSTOM_CACHE_NAME).put("newKey", 
"newValue");
+
+        
mockResult.message(0).outHeader(InfinispanConstants.IS_PRE).isEqualTo(true);
+        
mockResult.message(0).outHeader(InfinispanConstants.KEY).isEqualTo("newKey");
+        
mockResult.message(1).outHeader(InfinispanConstants.IS_PRE).isEqualTo(false);
+
+        mockResult.assertIsSatisfied();
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/c7b1e905/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanLocalConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanLocalConsumerTest.java
 
b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanLocalConsumerTest.java
new file mode 100644
index 0000000..2b0e881
--- /dev/null
+++ 
b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanLocalConsumerTest.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
+
+public class InfinispanLocalConsumerTest extends InfinispanTestSupport {
+
+    @EndpointInject(uri = "mock:result")
+    private MockEndpoint mockResult;
+
+    @EndpointInject(uri = "mock:result2")
+    private MockEndpoint mockResult2;
+
+    @Test
+    public void consumerReceivedPreAndPostEntryCreatedEventNotifications() 
throws Exception {
+        mockResult.expectedMessageCount(2);
+
+        
mockResult.message(0).outHeader(InfinispanConstants.EVENT_TYPE).isEqualTo("CACHE_ENTRY_CREATED");
+        
mockResult.message(0).outHeader(InfinispanConstants.IS_PRE).isEqualTo(true);
+        
mockResult.message(0).outHeader(InfinispanConstants.CACHE_NAME).isNotNull();
+        
mockResult.message(0).outHeader(InfinispanConstants.KEY).isEqualTo(KEY_ONE);
+
+        
mockResult.message(1).outHeader(InfinispanConstants.EVENT_TYPE).isEqualTo("CACHE_ENTRY_CREATED");
+        
mockResult.message(1).outHeader(InfinispanConstants.IS_PRE).isEqualTo(false);
+        
mockResult.message(1).outHeader(InfinispanConstants.CACHE_NAME).isNotNull();
+        
mockResult.message(1).outHeader(InfinispanConstants.KEY).isEqualTo(KEY_ONE);
+
+        currentCache().put(KEY_ONE, VALUE_ONE);
+        mockResult.assertIsSatisfied();
+    }
+
+    @Test
+    public void consumerReceivedExpirationEventNotification() throws Exception 
{
+        mockResult2.expectedMessageCount(1);
+
+        
mockResult2.message(0).outHeader(InfinispanConstants.EVENT_TYPE).isEqualTo("CACHE_ENTRY_EXPIRED");
+        
mockResult2.message(0).outHeader(InfinispanConstants.IS_PRE).isEqualTo(false);
+        
mockResult2.message(0).outHeader(InfinispanConstants.CACHE_NAME).isNotNull();
+        
mockResult2.message(0).outHeader(InfinispanConstants.KEY).isEqualTo("keyTwo");
+
+        injectTimeService();
+        currentCache().put("keyTwo", "valueTwo", 1000, TimeUnit.MILLISECONDS);
+        ts.advance(1001);
+        //expiration events are thrown only after a get if expiration reaper 
thread is not enabled
+        assertNull(currentCache().get("keyTwo"));
+        mockResult2.assertIsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                
from("infinispan://localhost?cacheContainer=#cacheContainer&eventTypes=CACHE_ENTRY_CREATED")
+                        .to("mock:result");
+                
from("infinispan://localhost?cacheContainer=#cacheContainer&eventTypes=CACHE_ENTRY_EXPIRED")
+                        .to("mock:result2");
+            }
+        };
+    }
+}
+

Reply via email to