CAMEL-8243 Enhance Infinispan listener support - embedded and remote listeners - clustered listeners - custom listeners - event data
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/c7b1e905 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/c7b1e905 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/c7b1e905 Branch: refs/heads/master Commit: c7b1e9050614e7c2a39d0f2d9c32f1c7fdc1e157 Parents: 80b0d0b Author: Tristan Tarrant <ttarr...@redhat.com> Authored: Thu Feb 18 20:01:38 2016 +0100 Committer: Andrea Cosentino <anco...@gmail.com> Committed: Sat Feb 20 10:16:23 2016 +0100 ---------------------------------------------------------------------- components/camel-infinispan/pom.xml | 7 + .../InfinispanAsyncEventListener.java | 29 --- .../infinispan/InfinispanConfiguration.java | 34 +++- .../infinispan/InfinispanConstants.java | 1 + .../infinispan/InfinispanConsumer.java | 50 ++++-- .../infinispan/InfinispanConsumerHandler.java | 24 +++ .../infinispan/InfinispanCustomListener.java | 38 ++++ .../infinispan/InfinispanEventListener.java | 46 +++++ .../infinispan/InfinispanSyncEventListener.java | 61 ------- .../InfinispanAsyncClusteredEventListener.java | 28 +++ .../InfinispanAsyncLocalEventListener.java | 29 +++ .../InfinispanConsumerEmbeddedHandler.java | 63 +++++++ .../InfinispanEmbeddedCustomListener.java | 46 +++++ .../InfinispanSyncClusteredEventListener.java | 54 ++++++ .../InfinispanSyncLocalEventListener.java | 64 +++++++ .../remote/InfinispanConsumerRemoteHandler.java | 58 ++++++ .../remote/InfinispanRemoteCustomListener.java | 68 +++++++ .../remote/InfinispanRemoteEventListener.java | 79 ++++++++ .../infinispan/InfinispanAsyncConsumerTest.java | 50 ++++++ .../InfinispanClusterTestSupport.java | 133 ++++++++++++++ .../InfinispanClusteredConsumerTest.java | 106 +++++++++++ ...InfinispanLocalConsumerCustomListenerIT.java | 63 +++++++ .../infinispan/InfinispanLocalConsumerTest.java | 81 +++++++++ .../infinispan/InfinispanProducerTest.java | 179 ++++++++++--------- .../infinispan/InfinispanTestSupport.java | 13 ++ .../infinispan/MyEmbeddedCustomListener.java | 44 +++++ 26 files changed, 1251 insertions(+), 197 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/c7b1e905/components/camel-infinispan/pom.xml ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/pom.xml b/components/camel-infinispan/pom.xml index c6e4c07..7dd09bd 100644 --- a/components/camel-infinispan/pom.xml +++ b/components/camel-infinispan/pom.xml @@ -58,6 +58,13 @@ <scope>test</scope> </dependency> <dependency> + <groupId>org.infinispan</groupId> + <artifactId>infinispan-core</artifactId> + <version>${infinispan-version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/camel/blob/c7b1e905/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanAsyncEventListener.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanAsyncEventListener.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanAsyncEventListener.java deleted file mode 100644 index 85366a5..0000000 --- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanAsyncEventListener.java +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.infinispan; - -import java.util.Set; - -import org.infinispan.notifications.Listener; - -@Listener(sync = false) -public class InfinispanAsyncEventListener extends InfinispanSyncEventListener { - - public InfinispanAsyncEventListener(InfinispanConsumer consumer, Set<String> eventTypes) { - super(consumer, eventTypes); - } -} http://git-wip-us.apache.org/repos/asf/camel/blob/c7b1e905/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConfiguration.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConfiguration.java index a738ce4..100ff9e 100644 --- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConfiguration.java +++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConfiguration.java @@ -41,6 +41,10 @@ public class InfinispanConfiguration { private boolean sync = true; @UriParam(label = "consumer", javaType = "java.lang.String") private Set<String> eventTypes; + @UriParam + private InfinispanCustomListener customListener; + @UriParam(label = "consumer", defaultValue = "false") + private boolean clustered; public String getCommand() { return command; @@ -66,7 +70,7 @@ public class InfinispanConfiguration { /** * Specifies the cache Container to connect - */ + */ public BasicCacheContainer getCacheContainer() { return cacheContainer; } @@ -77,7 +81,7 @@ public class InfinispanConfiguration { /** * Specifies the cache name - */ + */ public String getCacheName() { return cacheName; } @@ -97,6 +101,17 @@ public class InfinispanConfiguration { this.sync = sync; } + /** + * If true, the listener will be installed for the entire cluster + */ + public boolean isClustered() { + return clustered; + } + + public void setClustered(boolean clustered) { + this.clustered = clustered; + } + public Set<String> getEventTypes() { return eventTypes; } @@ -122,4 +137,19 @@ public class InfinispanConfiguration { public void setEventTypes(String eventTypes) { this.eventTypes = new HashSet<String>(Arrays.asList(eventTypes.split(","))); } + + /** + * Returns the custom listener in use, if provided + */ + public InfinispanCustomListener getCustomListener() { + return customListener; + } + + public void setCustomListener(InfinispanCustomListener customListener) { + this.customListener = customListener; + } + + public boolean isCustom() { + return customListener != null; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/c7b1e905/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java index a6ae8a5..681ac65 100644 --- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java +++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java @@ -47,4 +47,5 @@ public interface InfinispanConstants { String MAX_IDLE_TIME = "CamelInfinispanMaxIdleTime"; String MAX_IDLE_TIME_UNIT = "CamelInfinispanMaxIdleTimeUnit"; String IGNORE_RETURN_VALUES = "CamelInfinispanIgnoreReturnValues"; + String EVENT_DATA = "CamelInfinispanEventData"; } http://git-wip-us.apache.org/repos/asf/camel/blob/c7b1e905/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConsumer.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConsumer.java index cb5161a..90a5e3e 100644 --- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConsumer.java +++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConsumer.java @@ -18,8 +18,12 @@ package org.apache.camel.component.infinispan; import org.apache.camel.Exchange; import org.apache.camel.Processor; +import org.apache.camel.component.infinispan.embedded.InfinispanConsumerEmbeddedHandler; +import org.apache.camel.component.infinispan.remote.InfinispanConsumerRemoteHandler; import org.apache.camel.impl.DefaultConsumer; import org.infinispan.Cache; +import org.infinispan.commons.api.BasicCache; +import org.infinispan.commons.api.BasicCacheContainer; import org.infinispan.manager.EmbeddedCacheManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,25 +31,28 @@ import org.slf4j.LoggerFactory; public class InfinispanConsumer extends DefaultConsumer { private static final transient Logger LOGGER = LoggerFactory.getLogger(InfinispanProducer.class); private final InfinispanConfiguration configuration; - private final InfinispanSyncEventListener listener; + private InfinispanEventListener listener; private EmbeddedCacheManager cacheManager; + private BasicCache<Object, Object> cache; public InfinispanConsumer(InfinispanEndpoint endpoint, Processor processor, InfinispanConfiguration configuration) { super(endpoint, processor); this.configuration = configuration; - if (configuration.isSync()) { - listener = new InfinispanSyncEventListener(this, configuration.getEventTypes()); - } else { - listener = new InfinispanAsyncEventListener(this, configuration.getEventTypes()); - } } public void processEvent(String eventType, boolean isPre, String cacheName, Object key) { + processEvent(eventType, isPre, cacheName, key, null); + } + + public void processEvent(String eventType, boolean isPre, String cacheName, Object key, Object eventData) { Exchange exchange = getEndpoint().createExchange(); exchange.getOut().setHeader(InfinispanConstants.EVENT_TYPE, eventType); exchange.getOut().setHeader(InfinispanConstants.IS_PRE, isPre); exchange.getOut().setHeader(InfinispanConstants.CACHE_NAME, cacheName); exchange.getOut().setHeader(InfinispanConstants.KEY, key); + if (eventData != null) { + exchange.getOut().setHeader(InfinispanConstants.EVENT_DATA, eventData); + } try { getProcessor().process(exchange); @@ -56,19 +63,18 @@ public class InfinispanConsumer extends DefaultConsumer { @Override protected void doStart() throws Exception { - if (configuration.getCacheContainer() instanceof EmbeddedCacheManager) { - cacheManager = (EmbeddedCacheManager) configuration.getCacheContainer(); - Cache<Object, Object> cache; - if (configuration.getCacheName() != null) { - cache = cacheManager.getCache(configuration.getCacheName()); - } else { - cache = cacheManager.getCache(); - } - cache.addListener(listener); + BasicCacheContainer cacheContainer = configuration.getCacheContainer(); + String cacheName = configuration.getCacheName(); + cache = cacheName == null ? cacheContainer.getCache() : cacheContainer.getCache(cacheName); + if (InfinispanUtil.isEmbedded(cacheContainer)) { + listener = InfinispanConsumerEmbeddedHandler.INSTANCE.start(this); + } else if (InfinispanUtil.isRemote(cacheContainer)) { + listener = InfinispanConsumerRemoteHandler.INSTANCE.start(this); } else { - throw new UnsupportedOperationException("Consumer not support for CacheContainer: " + configuration.getCacheContainer()); + throw new UnsupportedOperationException("Consumer not support for CacheContainer: " + cacheContainer); } super.doStart(); + } @Override @@ -78,4 +84,16 @@ public class InfinispanConsumer extends DefaultConsumer { } super.doStop(); } + + public BasicCache<Object, Object> getCache() { + return cache; + } + + public InfinispanEventListener getListener() { + return listener; + } + + public InfinispanConfiguration getConfiguration() { + return configuration; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/c7b1e905/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConsumerHandler.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConsumerHandler.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConsumerHandler.java new file mode 100644 index 0000000..4f83ad9 --- /dev/null +++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConsumerHandler.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.infinispan; + + +public interface InfinispanConsumerHandler { + InfinispanEventListener start(InfinispanConsumer consumer); + + void stop(InfinispanConsumer consumer); +} http://git-wip-us.apache.org/repos/asf/camel/blob/c7b1e905/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanCustomListener.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanCustomListener.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanCustomListener.java new file mode 100644 index 0000000..46c932f --- /dev/null +++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanCustomListener.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.infinispan; + +import java.util.Set; + +/** + * This class is supposed to be extended by users and annotated with @Listener or @ClientListener + * and passed to the consumer endpoint through the 'customListener' parameter. + */ +public abstract class InfinispanCustomListener extends InfinispanEventListener { + + public InfinispanCustomListener(InfinispanConsumer infinispanConsumer, Set<String> eventTypes) { + super(infinispanConsumer, eventTypes); + } + + public InfinispanConsumer getInfinispanConsumer() { + return infinispanConsumer; + } + + public void setInfinispanConsumer(InfinispanConsumer infinispanConsumer) { + this.infinispanConsumer = infinispanConsumer; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/c7b1e905/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanEventListener.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanEventListener.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanEventListener.java new file mode 100644 index 0000000..f40bb57 --- /dev/null +++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanEventListener.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.infinispan; + +import java.util.Set; + +public abstract class InfinispanEventListener { + protected InfinispanConsumer infinispanConsumer; + protected Set<String> eventTypes; + protected String cacheName; + + public InfinispanEventListener(InfinispanConsumer infinispanConsumer, Set<String> eventTypes) { + this.infinispanConsumer = infinispanConsumer; + this.eventTypes = eventTypes; + } + + public void setInfinispanConsumer(InfinispanConsumer infinispanConsumer) { + this.infinispanConsumer = infinispanConsumer; + } + + public void setEventTypes(Set<String> eventTypes) { + this.eventTypes = eventTypes; + } + + public void setCacheName(String cacheName) { + this.cacheName = cacheName; + } + + protected boolean isAccepted(String eventType) { + return eventTypes == null || eventTypes.isEmpty() || eventTypes.contains(eventType); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/c7b1e905/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanSyncEventListener.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanSyncEventListener.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanSyncEventListener.java deleted file mode 100644 index 2ae6f00..0000000 --- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanSyncEventListener.java +++ /dev/null @@ -1,61 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.infinispan; - -import java.util.Set; - -import org.infinispan.notifications.Listener; -import org.infinispan.notifications.cachelistener.annotation.CacheEntryActivated; -import org.infinispan.notifications.cachelistener.annotation.CacheEntryCreated; -import org.infinispan.notifications.cachelistener.annotation.CacheEntryInvalidated; -import org.infinispan.notifications.cachelistener.annotation.CacheEntryLoaded; -import org.infinispan.notifications.cachelistener.annotation.CacheEntryModified; -import org.infinispan.notifications.cachelistener.annotation.CacheEntryPassivated; -import org.infinispan.notifications.cachelistener.annotation.CacheEntryRemoved; -import org.infinispan.notifications.cachelistener.annotation.CacheEntryVisited; -import org.infinispan.notifications.cachelistener.event.CacheEntryEvent; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -@Listener(sync = true) -public class InfinispanSyncEventListener { - private final transient Logger logger = LoggerFactory.getLogger(this.getClass()); - private final InfinispanConsumer infinispanConsumer; - private final Set<String> eventTypes; - - public InfinispanSyncEventListener(InfinispanConsumer infinispanConsumer, Set<String> eventTypes) { - this.infinispanConsumer = infinispanConsumer; - this.eventTypes = eventTypes; - } - - @CacheEntryActivated - @CacheEntryCreated - @CacheEntryInvalidated - @CacheEntryLoaded - @CacheEntryModified - @CacheEntryPassivated - @CacheEntryRemoved - @CacheEntryVisited - public void processEvent(CacheEntryEvent<Object, Object> event) { - logger.trace("Received CacheEntryEvent [{}]", event); - - if (eventTypes == null || eventTypes.isEmpty() || eventTypes.contains(event.getType().toString())) { - infinispanConsumer.processEvent(event.getType().toString(), event.isPre(), event.getCache().getName(), event.getKey()); - } - } -} - http://git-wip-us.apache.org/repos/asf/camel/blob/c7b1e905/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanAsyncClusteredEventListener.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanAsyncClusteredEventListener.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanAsyncClusteredEventListener.java new file mode 100644 index 0000000..8ef544f --- /dev/null +++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanAsyncClusteredEventListener.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.infinispan.embedded; + +import java.util.Set; +import org.apache.camel.component.infinispan.InfinispanConsumer; +import org.infinispan.notifications.Listener; + +@Listener(clustered = true, sync = false) +public class InfinispanAsyncClusteredEventListener extends InfinispanSyncLocalEventListener { + public InfinispanAsyncClusteredEventListener(InfinispanConsumer consumer, Set<String> eventTypes) { + super(consumer, eventTypes); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/c7b1e905/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanAsyncLocalEventListener.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanAsyncLocalEventListener.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanAsyncLocalEventListener.java new file mode 100644 index 0000000..4f50950 --- /dev/null +++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanAsyncLocalEventListener.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.infinispan.embedded; + +import java.util.Set; +import org.apache.camel.component.infinispan.InfinispanConsumer; +import org.infinispan.notifications.Listener; + +@Listener(clustered = false, sync = false) +public class InfinispanAsyncLocalEventListener extends InfinispanSyncLocalEventListener { + + public InfinispanAsyncLocalEventListener(InfinispanConsumer consumer, Set<String> eventTypes) { + super(consumer, eventTypes); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/c7b1e905/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanConsumerEmbeddedHandler.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanConsumerEmbeddedHandler.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanConsumerEmbeddedHandler.java new file mode 100644 index 0000000..d7844e6 --- /dev/null +++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanConsumerEmbeddedHandler.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.infinispan.embedded; + +import org.apache.camel.component.infinispan.InfinispanConfiguration; +import org.apache.camel.component.infinispan.InfinispanConsumer; +import org.apache.camel.component.infinispan.InfinispanConsumerHandler; +import org.apache.camel.component.infinispan.InfinispanCustomListener; +import org.apache.camel.component.infinispan.InfinispanEventListener; +import org.infinispan.Cache; + +public final class InfinispanConsumerEmbeddedHandler implements InfinispanConsumerHandler { + public static final InfinispanConsumerHandler INSTANCE = new InfinispanConsumerEmbeddedHandler(); + + private InfinispanConsumerEmbeddedHandler() { + } + + @Override + public InfinispanEventListener start(InfinispanConsumer consumer) { + Cache<?, ?> embeddedCache = (Cache<?, ?>) consumer.getCache(); + InfinispanConfiguration configuration = consumer.getConfiguration(); + InfinispanEventListener listener; + if (configuration.isCustom()) { + listener = configuration.getCustomListener(); + ((InfinispanCustomListener)listener).setInfinispanConsumer(consumer); + } else if (configuration.isClustered()) { + if (configuration.isSync()) { + listener = new InfinispanSyncClusteredEventListener(consumer, configuration.getEventTypes()); + } else { + listener = new InfinispanAsyncClusteredEventListener(consumer, configuration.getEventTypes()); + } + } else { + if (configuration.isSync()) { + listener = new InfinispanSyncLocalEventListener(consumer, configuration.getEventTypes()); + } else { + listener = new InfinispanAsyncLocalEventListener(consumer, configuration.getEventTypes()); + } + } + embeddedCache.addListener(listener); + return listener; + } + + @Override + public void stop(InfinispanConsumer consumer) { + Cache<?, ?> embeddedCache = (Cache<?, ?>) consumer.getCache(); + embeddedCache.removeListener(consumer.getListener()); + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/c7b1e905/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanEmbeddedCustomListener.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanEmbeddedCustomListener.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanEmbeddedCustomListener.java new file mode 100644 index 0000000..040f406 --- /dev/null +++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanEmbeddedCustomListener.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.infinispan.embedded; + +import org.apache.camel.component.infinispan.InfinispanCustomListener; +import org.infinispan.notifications.cachelistener.annotation.CacheEntryCreated; +import org.infinispan.notifications.cachelistener.annotation.CacheEntryExpired; +import org.infinispan.notifications.cachelistener.annotation.CacheEntryModified; +import org.infinispan.notifications.cachelistener.annotation.CacheEntryRemoved; +import org.infinispan.notifications.cachelistener.event.CacheEntryEvent; + +/** + * This class is supposed to be extended by users and annotated with @Listener + * and passed to the consumer endpoint through the 'customListener' parameter. + */ +public abstract class InfinispanEmbeddedCustomListener extends InfinispanCustomListener { + + public InfinispanEmbeddedCustomListener() { + super(null, null); + } + + @CacheEntryCreated + @CacheEntryModified + @CacheEntryRemoved + @CacheEntryExpired + public void processEvent(CacheEntryEvent<Object, Object> event) { + if (isAccepted(event.getType().toString())) { + infinispanConsumer.processEvent(event.getType().toString(), event.isPre(), event.getCache().getName(), event.getKey()); + } + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/c7b1e905/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanSyncClusteredEventListener.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanSyncClusteredEventListener.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanSyncClusteredEventListener.java new file mode 100644 index 0000000..bcf6957 --- /dev/null +++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanSyncClusteredEventListener.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.infinispan.embedded; + +import java.util.Set; +import org.apache.camel.component.infinispan.InfinispanConsumer; +import org.apache.camel.component.infinispan.InfinispanEventListener; +import org.infinispan.notifications.Listener; +import org.infinispan.notifications.cachelistener.annotation.CacheEntryCreated; +import org.infinispan.notifications.cachelistener.annotation.CacheEntryExpired; +import org.infinispan.notifications.cachelistener.annotation.CacheEntryModified; +import org.infinispan.notifications.cachelistener.annotation.CacheEntryRemoved; +import org.infinispan.notifications.cachelistener.event.CacheEntryEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Listener(clustered = true, sync = true) +public class InfinispanSyncClusteredEventListener extends InfinispanEventListener { + /*clustered listeners only listen for post events*/ + private static final boolean IS_PRE = false; + + private final transient Logger logger = LoggerFactory.getLogger(this.getClass()); + + public InfinispanSyncClusteredEventListener(InfinispanConsumer infinispanConsumer, Set<String> eventTypes) { + super(infinispanConsumer, eventTypes); + } + + @CacheEntryCreated + @CacheEntryModified + @CacheEntryRemoved + @CacheEntryExpired + public void processEvent(CacheEntryEvent<Object, Object> event) { + logger.trace("Received CacheEntryEvent [{}]", event); + + if (isAccepted(event.getType().toString())) { + infinispanConsumer.processEvent(event.getType().toString(), IS_PRE, event.getCache().getName(), event.getKey()); + } + } +} + http://git-wip-us.apache.org/repos/asf/camel/blob/c7b1e905/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanSyncLocalEventListener.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanSyncLocalEventListener.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanSyncLocalEventListener.java new file mode 100644 index 0000000..1cf9325 --- /dev/null +++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanSyncLocalEventListener.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.infinispan.embedded; + +import java.util.Set; +import org.apache.camel.component.infinispan.InfinispanConsumer; +import org.apache.camel.component.infinispan.InfinispanEventListener; +import org.infinispan.notifications.Listener; +import org.infinispan.notifications.cachelistener.annotation.CacheEntryActivated; +import org.infinispan.notifications.cachelistener.annotation.CacheEntryCreated; +import org.infinispan.notifications.cachelistener.annotation.CacheEntryExpired; +import org.infinispan.notifications.cachelistener.annotation.CacheEntryInvalidated; +import org.infinispan.notifications.cachelistener.annotation.CacheEntryLoaded; +import org.infinispan.notifications.cachelistener.annotation.CacheEntryModified; +import org.infinispan.notifications.cachelistener.annotation.CacheEntryPassivated; +import org.infinispan.notifications.cachelistener.annotation.CacheEntryRemoved; +import org.infinispan.notifications.cachelistener.annotation.CacheEntryVisited; +import org.infinispan.notifications.cachelistener.event.CacheEntryEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Listener(clustered = false, sync = true) +public class InfinispanSyncLocalEventListener extends InfinispanEventListener { + private final transient Logger logger = LoggerFactory.getLogger(this.getClass()); + + public InfinispanSyncLocalEventListener(InfinispanConsumer infinispanConsumer, Set<String> eventTypes) { + super(infinispanConsumer, eventTypes); + } + + @CacheEntryActivated + @CacheEntryCreated + @CacheEntryInvalidated + @CacheEntryLoaded + @CacheEntryModified + @CacheEntryPassivated + @CacheEntryRemoved + @CacheEntryVisited + @CacheEntryExpired + public void processEvent(CacheEntryEvent<Object, Object> event) { + logger.trace("Received CacheEntryEvent [{}]", event); + + dispatch(event.getType().toString(), event.isPre(), event.getCache().getName(), event.getKey()); + } + + private void dispatch(String eventType, boolean isPre, String cacheName, Object key) { + if (isAccepted(eventType)) { + infinispanConsumer.processEvent(eventType, isPre, cacheName, key, null); + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/c7b1e905/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanConsumerRemoteHandler.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanConsumerRemoteHandler.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanConsumerRemoteHandler.java new file mode 100644 index 0000000..48ec911 --- /dev/null +++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanConsumerRemoteHandler.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.infinispan.remote; + +import org.apache.camel.component.infinispan.InfinispanConfiguration; +import org.apache.camel.component.infinispan.InfinispanConsumer; +import org.apache.camel.component.infinispan.InfinispanConsumerHandler; +import org.apache.camel.component.infinispan.InfinispanCustomListener; +import org.apache.camel.component.infinispan.InfinispanEventListener; +import org.infinispan.client.hotrod.RemoteCache; + +public final class InfinispanConsumerRemoteHandler implements InfinispanConsumerHandler { + public static final InfinispanConsumerHandler INSTANCE = new InfinispanConsumerRemoteHandler(); + + private InfinispanConsumerRemoteHandler() { + } + + @Override + public InfinispanEventListener start(InfinispanConsumer consumer) { + if (consumer.getConfiguration().isSync()) { + throw new UnsupportedOperationException("Sync listeners not supported for remote caches."); + } + RemoteCache<?, ?> remoteCache = (RemoteCache<?, ?>) consumer.getCache(); + InfinispanConfiguration configuration = consumer.getConfiguration(); + InfinispanEventListener listener; + if (configuration.isCustom()) { + listener = configuration.getCustomListener(); + ((InfinispanCustomListener)listener).setInfinispanConsumer(consumer); + } else { + listener = new InfinispanRemoteEventListener(consumer, configuration.getEventTypes()); + } + remoteCache.addClientListener(listener); + listener.setCacheName(remoteCache.getName()); + return listener; + + } + + @Override + public void stop(InfinispanConsumer consumer) { + RemoteCache<?, ?> remoteCache = (RemoteCache<?, ?>) consumer.getCache(); + remoteCache.removeClientListener(consumer.getListener()); + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/c7b1e905/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanRemoteCustomListener.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanRemoteCustomListener.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanRemoteCustomListener.java new file mode 100644 index 0000000..7ab0447 --- /dev/null +++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanRemoteCustomListener.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.infinispan.remote; + +import org.apache.camel.component.infinispan.InfinispanCustomListener; +import org.infinispan.client.hotrod.annotation.ClientCacheEntryCreated; +import org.infinispan.client.hotrod.annotation.ClientCacheEntryExpired; +import org.infinispan.client.hotrod.annotation.ClientCacheEntryModified; +import org.infinispan.client.hotrod.annotation.ClientCacheEntryRemoved; +import org.infinispan.client.hotrod.annotation.ClientCacheFailover; +import org.infinispan.client.hotrod.event.ClientCacheEntryCreatedEvent; +import org.infinispan.client.hotrod.event.ClientCacheEntryCustomEvent; +import org.infinispan.client.hotrod.event.ClientCacheEntryModifiedEvent; +import org.infinispan.client.hotrod.event.ClientCacheEntryRemovedEvent; +import org.infinispan.client.hotrod.event.ClientEvent; + +/** + * This class is supposed to be extended by users and annotated with @ClientListener + * and passed to the consumer endpoint through the 'customListener' parameter. + */ +public abstract class InfinispanRemoteCustomListener extends InfinispanCustomListener { + public InfinispanRemoteCustomListener() { + super(null, null); + } + + @ClientCacheEntryCreated + @ClientCacheEntryModified + @ClientCacheEntryRemoved + @ClientCacheEntryExpired + @ClientCacheFailover + public void processClientEvent(ClientEvent event) { + if (isAccepted(event.getType().toString())) { + infinispanConsumer.processEvent(event.getType().toString(), false, cacheName, getKey(event), getEventData(event)); + } + } + + private Object getKey(ClientEvent event) { + if (event instanceof ClientCacheEntryCreatedEvent) { + return ((ClientCacheEntryCreatedEvent<?>) event).getKey(); + } else if (event instanceof ClientCacheEntryModifiedEvent) { + return ((ClientCacheEntryModifiedEvent<?>) event).getKey(); + } else if (event instanceof ClientCacheEntryRemovedEvent) { + return ((ClientCacheEntryRemovedEvent<?>) event).getKey(); + } + return null; + } + + private Object getEventData(ClientEvent e) { + if (e instanceof ClientCacheEntryCustomEvent) { + return ((ClientCacheEntryCustomEvent<?>) e).getEventData(); + } + return null; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/c7b1e905/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanRemoteEventListener.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanRemoteEventListener.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanRemoteEventListener.java new file mode 100644 index 0000000..40807eb --- /dev/null +++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanRemoteEventListener.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.infinispan.remote; + +import java.util.Set; +import org.apache.camel.component.infinispan.InfinispanConsumer; +import org.apache.camel.component.infinispan.InfinispanEventListener; +import org.infinispan.client.hotrod.annotation.ClientCacheEntryCreated; +import org.infinispan.client.hotrod.annotation.ClientCacheEntryExpired; +import org.infinispan.client.hotrod.annotation.ClientCacheEntryModified; +import org.infinispan.client.hotrod.annotation.ClientCacheEntryRemoved; +import org.infinispan.client.hotrod.annotation.ClientCacheFailover; +import org.infinispan.client.hotrod.annotation.ClientListener; +import org.infinispan.client.hotrod.event.ClientCacheEntryCreatedEvent; +import org.infinispan.client.hotrod.event.ClientCacheEntryExpiredEvent; +import org.infinispan.client.hotrod.event.ClientCacheEntryModifiedEvent; +import org.infinispan.client.hotrod.event.ClientCacheEntryRemovedEvent; +import org.infinispan.client.hotrod.event.ClientCacheFailoverEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@ClientListener +public class InfinispanRemoteEventListener extends InfinispanEventListener { + private final transient Logger logger = LoggerFactory.getLogger(this.getClass()); + + public InfinispanRemoteEventListener(InfinispanConsumer infinispanConsumer, Set<String> eventTypes) { + super(infinispanConsumer, eventTypes); + } + + @ClientCacheEntryCreated + public void processEvent(ClientCacheEntryCreatedEvent<Object> event) { + logger.trace("Received ClientEvent [{}]", event); + dispatch(event.getType().toString(), false, cacheName, event.getKey()); + } + + @ClientCacheEntryModified + public void processEvent(ClientCacheEntryModifiedEvent<Object> event) { + logger.trace("Received ClientEvent [{}]", event); + dispatch(event.getType().toString(), false, cacheName, event.getKey()); + } + + @ClientCacheEntryRemoved + public void processEvent(ClientCacheEntryRemovedEvent<Object> event) { + logger.trace("Received ClientEvent [{}]", event); + dispatch(event.getType().toString(), false, cacheName, event.getKey()); + } + + @ClientCacheFailover + public void processEvent(ClientCacheFailoverEvent event) { + logger.trace("Received ClientEvent [{}]", event); + dispatch(event.getType().toString(), false, cacheName, null); + } + + @ClientCacheEntryExpired + public void processEvent(ClientCacheEntryExpiredEvent<Object> event) { + logger.trace("Received ClientEvent [{}]", event); + dispatch(event.getType().toString(), false, cacheName, event.getKey()); + } + + private void dispatch(String eventType, boolean isPre, String cacheName, Object key) { + if (isAccepted(eventType)) { + infinispanConsumer.processEvent(eventType, isPre, cacheName, key, null); + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/c7b1e905/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanAsyncConsumerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanAsyncConsumerTest.java b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanAsyncConsumerTest.java new file mode 100644 index 0000000..dc44483 --- /dev/null +++ b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanAsyncConsumerTest.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.infinispan; + +import org.apache.camel.EndpointInject; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.junit.Test; + +public class InfinispanAsyncConsumerTest extends InfinispanTestSupport { + + @EndpointInject(uri = "mock:result") + private MockEndpoint mockResult; + + @Test + public void consumerReceivedPreAndPostEntryCreatedEventNotifications() throws Exception { + mockResult.expectedMessageCount(2); + mockResult.setResultMinimumWaitTime(900); + + currentCache().put(KEY_ONE, VALUE_ONE); + mockResult.assertIsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() { + from("infinispan://localhost?cacheContainer=#cacheContainer&sync=false&eventTypes=CACHE_ENTRY_CREATED") + .delayer(500) + .to("mock:result"); + } + }; + } +} + http://git-wip-us.apache.org/repos/asf/camel/blob/c7b1e905/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanClusterTestSupport.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanClusterTestSupport.java b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanClusterTestSupport.java new file mode 100644 index 0000000..b47b1db --- /dev/null +++ b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanClusterTestSupport.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.infinispan; + +import java.util.List; +import org.apache.camel.impl.JndiRegistry; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.infinispan.Cache; +import org.infinispan.commons.api.BasicCacheContainer; +import org.infinispan.configuration.cache.CacheMode; +import org.infinispan.configuration.cache.ConfigurationBuilder; +import org.infinispan.manager.EmbeddedCacheManager; +import org.infinispan.test.MultipleCacheManagersTest; +import org.infinispan.test.TestingUtil; +import org.infinispan.transaction.TransactionMode; +import org.infinispan.util.ControlledTimeService; +import org.infinispan.util.TimeService; +import org.junit.Before; + +public class InfinispanClusterTestSupport extends CamelTestSupport { + + protected static final String KEY_ONE = "keyOne"; + protected static final String VALUE_ONE = "valueOne"; + + protected List<EmbeddedCacheManager> clusteredCacheContainers; + + protected ControlledTimeService ts0; + protected ControlledTimeService ts1; + + protected static class ClusteredCacheSupport extends MultipleCacheManagersTest { + + protected ConfigurationBuilder builderUsed; + protected final boolean tx; + protected final CacheMode cacheMode; + protected String cacheName; + protected final int clusterSize; + + public ClusteredCacheSupport(CacheMode cacheMode, boolean tx, int clusterSize) { + this.tx = tx; + this.cacheMode = cacheMode; + this.clusterSize = clusterSize; + } + + public ClusteredCacheSupport(CacheMode cacheMode, String cacheName, boolean tx, int clusterSize) { + this.tx = tx; + this.cacheMode = cacheMode; + this.cacheName = cacheName; + this.clusterSize = clusterSize; + } + + @Override + public void createCacheManagers() throws Throwable { + builderUsed = new ConfigurationBuilder(); + builderUsed.clustering().cacheMode(cacheMode); + if (tx) { + builderUsed.transaction().transactionMode(TransactionMode.TRANSACTIONAL); + } + if (cacheMode.isDistributed()) { + builderUsed.clustering().hash().numOwners(1); + } + if (cacheName != null) { + createClusteredCaches(clusterSize, cacheName, builderUsed); + } else { + createClusteredCaches(clusterSize, builderUsed); + } + } + } + + @Override + @Before + public void setUp() throws Exception { + ClusteredCacheSupport cluster = new ClusteredCacheSupport(CacheMode.DIST_SYNC, false, 2); + try { + cluster.createCacheManagers(); + clusteredCacheContainers = cluster.getCacheManagers(); + } catch (Throwable ex) { + throw new Exception(ex); + } + super.setUp(); + } + + @Override + public void tearDown() throws Exception { + for (BasicCacheContainer container: clusteredCacheContainers) { + container.stop(); + } + super.tearDown(); + } + + @Override + protected JndiRegistry createRegistry() throws Exception { + JndiRegistry registry = super.createRegistry(); + registry.bind("cacheContainer", clusteredCacheContainers.get(0)); + return registry; + } + + protected Cache<Object, Object> defaultCache() { + return clusteredCacheContainers.get(0).getCache(); + } + + protected Cache<Object, Object> defaultCache(int index) { + return clusteredCacheContainers.get(index).getCache(); + } + + protected Cache<Object, Object> namedCache(String name) { + return clusteredCacheContainers.get(0).getCache(name); + } + + protected Cache<Object, Object> namedCache(int index, String name) { + return clusteredCacheContainers.get(index).getCache(name); + } + + protected void injectTimeService() { + ts0 = new ControlledTimeService(0); + TestingUtil.replaceComponent(clusteredCacheContainers.get(0), TimeService.class, ts0, true); + ts1 = new ControlledTimeService(0); + TestingUtil.replaceComponent(clusteredCacheContainers.get(1), TimeService.class, ts1, true); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/c7b1e905/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanClusteredConsumerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanClusteredConsumerTest.java b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanClusteredConsumerTest.java new file mode 100644 index 0000000..ab02821 --- /dev/null +++ b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanClusteredConsumerTest.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.infinispan; + +import java.util.concurrent.TimeUnit; +import org.apache.camel.EndpointInject; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.infinispan.distribution.MagicKey; +import org.infinispan.test.fwk.TestResourceTracker; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +public class InfinispanClusteredConsumerTest extends InfinispanClusterTestSupport { + + private static final long WAIT_TIMEOUT = 5000; + + @EndpointInject(uri = "mock:resultCreated") + private MockEndpoint mockResultCreatedEvents; + + @EndpointInject(uri = "mock:resultExpired") + private MockEndpoint mockResultExpiredEvents; + + @BeforeClass + public static void beforeClass() { + TestResourceTracker.testStarted(InfinispanClusteredConsumerTest.class.getName()); + } + + @AfterClass + public static void afterClass() { + TestResourceTracker.testFinished(InfinispanClusteredConsumerTest.class.getName()); + } + + @Test + public void consumerReceivedPostEntryCreatedEventNotifications() throws Exception { + MagicKey key = new MagicKey(defaultCache(1)); + + mockResultCreatedEvents.expectedMessageCount(1); + + mockResultCreatedEvents.message(0).outHeader(InfinispanConstants.EVENT_TYPE).isEqualTo("CACHE_ENTRY_CREATED"); + mockResultCreatedEvents.message(0).outHeader(InfinispanConstants.IS_PRE).isEqualTo(false); + mockResultCreatedEvents.message(0).outHeader(InfinispanConstants.CACHE_NAME).isNotNull(); + mockResultCreatedEvents.message(0).outHeader(InfinispanConstants.KEY).isEqualTo(key.toString()); + + defaultCache(1).put(key, "value"); + mockResultCreatedEvents.assertIsSatisfied(); + } + + @Test + public void consumerReceivedExpirationEventNotifications() throws Exception { + MagicKey key = new MagicKey(defaultCache(1)); + + mockResultCreatedEvents.expectedMessageCount(1); + mockResultExpiredEvents.expectedMessageCount(1); + + mockResultCreatedEvents.message(0).outHeader(InfinispanConstants.EVENT_TYPE).isEqualTo("CACHE_ENTRY_CREATED"); + mockResultCreatedEvents.message(0).outHeader(InfinispanConstants.IS_PRE).isEqualTo(false); + mockResultCreatedEvents.message(0).outHeader(InfinispanConstants.CACHE_NAME).isNotNull(); + mockResultCreatedEvents.message(0).outHeader(InfinispanConstants.KEY).isEqualTo(key.toString()); + + mockResultExpiredEvents.message(0).outHeader(InfinispanConstants.EVENT_TYPE).isEqualTo("CACHE_ENTRY_EXPIRED"); + mockResultExpiredEvents.message(0).outHeader(InfinispanConstants.IS_PRE).isEqualTo(false); + mockResultExpiredEvents.message(0).outHeader(InfinispanConstants.CACHE_NAME).isNotNull(); + mockResultExpiredEvents.message(0).outHeader(InfinispanConstants.KEY).isEqualTo(key.toString()); + + injectTimeService(); + + defaultCache(1).put(key, "value", 1000, TimeUnit.MILLISECONDS); + + ts0.advance(1001); + ts1.advance(1001); + + assertNull(defaultCache(1).get(key)); + mockResultCreatedEvents.assertIsSatisfied(); + mockResultExpiredEvents.assertIsSatisfied(WAIT_TIMEOUT); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() { + from("infinispan://localhost?cacheContainer=#cacheContainer&clustered=true&eventTypes=CACHE_ENTRY_CREATED") + .to("mock:resultCreated"); + from("infinispan://localhost?cacheContainer=#cacheContainer&clustered=true&eventTypes=CACHE_ENTRY_EXPIRED") + .to("mock:resultExpired"); + } + }; + } +} + http://git-wip-us.apache.org/repos/asf/camel/blob/c7b1e905/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanLocalConsumerCustomListenerIT.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanLocalConsumerCustomListenerIT.java b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanLocalConsumerCustomListenerIT.java new file mode 100644 index 0000000..40015e9 --- /dev/null +++ b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanLocalConsumerCustomListenerIT.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.infinispan; + +import org.apache.camel.EndpointInject; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.impl.JndiRegistry; +import org.junit.Test; + +public class InfinispanLocalConsumerCustomListenerIT extends InfinispanTestSupport { + + private static final String CUSTOM_CACHE_NAME = "customCacheName"; + + @EndpointInject(uri = "mock:result") + private MockEndpoint mockResult; + + @Override + protected JndiRegistry createRegistry() throws Exception { + JndiRegistry registry = super.createRegistry(); + registry.bind("myCustomListener", new MyEmbeddedCustomListener(CUSTOM_CACHE_NAME)); + return registry; + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() { + from("infinispan://?cacheContainer=#cacheContainer&cacheName=" + CUSTOM_CACHE_NAME + "&customListener=#myCustomListener") + .to("mock:result"); + } + }; + } + + @Test + public void createEventConsumed() throws InterruptedException { + //One for the existing entry and one for the new entry. + mockResult.expectedMessageCount(2); + + basicCacheContainer.getCache(CUSTOM_CACHE_NAME).put("newKey", "newValue"); + + mockResult.message(0).outHeader(InfinispanConstants.IS_PRE).isEqualTo(true); + mockResult.message(0).outHeader(InfinispanConstants.KEY).isEqualTo("newKey"); + mockResult.message(1).outHeader(InfinispanConstants.IS_PRE).isEqualTo(false); + + mockResult.assertIsSatisfied(); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/c7b1e905/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanLocalConsumerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanLocalConsumerTest.java b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanLocalConsumerTest.java new file mode 100644 index 0000000..2b0e881 --- /dev/null +++ b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanLocalConsumerTest.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.infinispan; + +import java.util.concurrent.TimeUnit; +import org.apache.camel.EndpointInject; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.junit.Test; + +public class InfinispanLocalConsumerTest extends InfinispanTestSupport { + + @EndpointInject(uri = "mock:result") + private MockEndpoint mockResult; + + @EndpointInject(uri = "mock:result2") + private MockEndpoint mockResult2; + + @Test + public void consumerReceivedPreAndPostEntryCreatedEventNotifications() throws Exception { + mockResult.expectedMessageCount(2); + + mockResult.message(0).outHeader(InfinispanConstants.EVENT_TYPE).isEqualTo("CACHE_ENTRY_CREATED"); + mockResult.message(0).outHeader(InfinispanConstants.IS_PRE).isEqualTo(true); + mockResult.message(0).outHeader(InfinispanConstants.CACHE_NAME).isNotNull(); + mockResult.message(0).outHeader(InfinispanConstants.KEY).isEqualTo(KEY_ONE); + + mockResult.message(1).outHeader(InfinispanConstants.EVENT_TYPE).isEqualTo("CACHE_ENTRY_CREATED"); + mockResult.message(1).outHeader(InfinispanConstants.IS_PRE).isEqualTo(false); + mockResult.message(1).outHeader(InfinispanConstants.CACHE_NAME).isNotNull(); + mockResult.message(1).outHeader(InfinispanConstants.KEY).isEqualTo(KEY_ONE); + + currentCache().put(KEY_ONE, VALUE_ONE); + mockResult.assertIsSatisfied(); + } + + @Test + public void consumerReceivedExpirationEventNotification() throws Exception { + mockResult2.expectedMessageCount(1); + + mockResult2.message(0).outHeader(InfinispanConstants.EVENT_TYPE).isEqualTo("CACHE_ENTRY_EXPIRED"); + mockResult2.message(0).outHeader(InfinispanConstants.IS_PRE).isEqualTo(false); + mockResult2.message(0).outHeader(InfinispanConstants.CACHE_NAME).isNotNull(); + mockResult2.message(0).outHeader(InfinispanConstants.KEY).isEqualTo("keyTwo"); + + injectTimeService(); + currentCache().put("keyTwo", "valueTwo", 1000, TimeUnit.MILLISECONDS); + ts.advance(1001); + //expiration events are thrown only after a get if expiration reaper thread is not enabled + assertNull(currentCache().get("keyTwo")); + mockResult2.assertIsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() { + from("infinispan://localhost?cacheContainer=#cacheContainer&eventTypes=CACHE_ENTRY_CREATED") + .to("mock:result"); + from("infinispan://localhost?cacheContainer=#cacheContainer&eventTypes=CACHE_ENTRY_EXPIRED") + .to("mock:result2"); + } + }; + } +} +