Repository: camel Updated Branches: refs/heads/master bef3d9224 -> dbd68347b
CAMEL-10287: Infinispan RoutePolicy to have one route being master, and others as slaves Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/dbd68347 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/dbd68347 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/dbd68347 Branch: refs/heads/master Commit: dbd68347b29ce7813262ed2a63fb31e36e94a034 Parents: bef3d92 Author: lburgazzoli <lburgazz...@gmail.com> Authored: Wed Apr 5 18:37:21 2017 +0200 Committer: lburgazzoli <lburgazz...@gmail.com> Committed: Fri Apr 7 17:36:32 2017 +0200 ---------------------------------------------------------------------- components/camel-infinispan/pom.xml | 14 +- .../infinispan/InfinispanConfiguration.java | 26 + .../component/infinispan/InfinispanManager.java | 44 +- .../component/infinispan/InfinispanUtil.java | 2 +- .../policy/InfinispanRoutePolicy.java | 522 +++++++++++++++++++ .../InfinispanEmbeddedRoutePolicyTest.java | 28 + .../policy/InfinispanRemoteRoutePolicyTest.java | 40 ++ .../policy/InfinispanRoutePolicyTestBase.java | 118 +++++ .../src/test/resources/log4j.xml | 2 +- 9 files changed, 771 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/dbd68347/components/camel-infinispan/pom.xml ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/pom.xml b/components/camel-infinispan/pom.xml index ddd590d..ea8751c 100644 --- a/components/camel-infinispan/pom.xml +++ b/components/camel-infinispan/pom.xml @@ -112,31 +112,25 @@ <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId> - <version>${log4j2-25-version}</version> + <version>${log4j2-version}</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> - <version>${log4j2-25-version}</version> + <version>${log4j2-version}</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-slf4j-impl</artifactId> - <version>${log4j2-25-version}</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.logging.log4j</groupId> - <artifactId>log4j-1.2-api</artifactId> - <version>${log4j2-25-version}</version> + <version>${log4j2-version}</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-jcl</artifactId> - <version>${log4j2-25-version}</version> + <version>${log4j2-version}</version> <scope>test</scope> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/camel/blob/dbd68347/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConfiguration.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConfiguration.java index 0ab5852..0835974 100644 --- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConfiguration.java +++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConfiguration.java @@ -17,7 +17,9 @@ package org.apache.camel.component.infinispan; import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; +import java.util.Map; import java.util.Set; import org.apache.camel.spi.Metadata; @@ -60,6 +62,8 @@ public class InfinispanConfiguration { private Flag[] flags; @UriParam(label = "advanced") private String configurationUri; + @UriParam(label = "advanced") + private Map<String, String> configurationProperties; public String getCommand() { @@ -223,4 +227,26 @@ public class InfinispanConfiguration { public void setConfigurationUri(String configurationUri) { this.configurationUri = configurationUri; } + + public Map<String, String> getConfigurationProperties() { + return configurationProperties; + } + + /** + * Infinispan configuration properties. + */ + public void setConfigurationProperties(Map<String, String> configurationProperties) { + this.configurationProperties = configurationProperties; + } + + /** + * Add configuration + */ + public void addConfigurationProperty(String key, String value) { + if (this.configurationProperties == null) { + this.configurationProperties = new HashMap<>(); + } + + this.configurationProperties.put(key, value); + } } http://git-wip-us.apache.org/repos/asf/camel/blob/dbd68347/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanManager.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanManager.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanManager.java index eab846b..8b739bb 100644 --- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanManager.java +++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanManager.java @@ -17,6 +17,8 @@ package org.apache.camel.component.infinispan; +import java.util.Properties; + import org.apache.camel.CamelContext; import org.apache.camel.Exchange; import org.apache.camel.Service; @@ -25,6 +27,7 @@ import org.infinispan.client.hotrod.RemoteCacheManager; import org.infinispan.client.hotrod.configuration.ConfigurationBuilder; import org.infinispan.commons.api.BasicCache; import org.infinispan.commons.api.BasicCacheContainer; +import org.infinispan.manager.DefaultCacheManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,6 +39,11 @@ public class InfinispanManager implements Service { private BasicCacheContainer cacheContainer; private boolean isManagedCacheContainer; + public InfinispanManager() { + this.camelContext = null; + this.configuration = new InfinispanConfiguration(); + this.configuration.setCacheContainer(new DefaultCacheManager(true)); + } public InfinispanManager(InfinispanConfiguration configuration) { this(null, configuration); @@ -50,24 +58,34 @@ public class InfinispanManager implements Service { public void start() throws Exception { cacheContainer = configuration.getCacheContainer(); if (cacheContainer == null) { + ConfigurationBuilder builder = new ConfigurationBuilder(); + builder.classLoader(Thread.currentThread().getContextClassLoader()); + + Properties properties = new Properties(); + String uri = configuration.getConfigurationUri(); if (uri != null && camelContext != null) { uri = camelContext.resolvePropertyPlaceholders(uri); } - ConfigurationBuilder configurationBuilder = new ConfigurationBuilder() - .classLoader(Thread.currentThread().getContextClassLoader()); - if (uri != null) { - configurationBuilder.withProperties(InfinispanUtil.loadProperties(camelContext, uri)); + properties.putAll(InfinispanUtil.loadProperties(camelContext, uri)); + } + if (configuration.getConfigurationProperties() != null) { + properties.putAll(configuration.getConfigurationProperties()); } - cacheContainer = new RemoteCacheManager( - configurationBuilder - .addServers(configuration.getHost()) - .build(), - true); + if (!properties.isEmpty()) { + builder.withProperties(properties); + } + + if (configuration.getHost() != null) { + builder.addServers(configuration.getHost()); + } + + + cacheContainer = new RemoteCacheManager(builder.build(), true); isManagedCacheContainer = true; } } @@ -91,22 +109,22 @@ public class InfinispanManager implements Service { return InfinispanUtil.isRemote(cacheContainer); } - public BasicCache<Object, Object> getCache() { + public <K, V > BasicCache<K, V> getCache() { return getCache(configuration.getCacheName()); } - public BasicCache<Object, Object> getCache(Exchange exchange) { + public <K, V > BasicCache<K, V> getCache(Exchange exchange) { return getCache(exchange.getIn().getHeader(InfinispanConstants.CACHE_NAME, String.class)); } - public BasicCache<Object, Object> getCache(String cacheName) { + public <K, V > BasicCache<K, V> getCache(String cacheName) { if (cacheName == null) { cacheName = configuration.getCacheName(); } LOGGER.trace("Cache[{}]", cacheName); - BasicCache<Object, Object> cache = InfinispanUtil.getCache(cacheContainer, cacheName); + BasicCache<K, V> cache = InfinispanUtil.getCache(cacheContainer, cacheName); if (configuration.hasFlags() && InfinispanUtil.isEmbedded(cache)) { cache = new DecoratedCache(InfinispanUtil.asAdvanced(cache), configuration.getFlags()); } http://git-wip-us.apache.org/repos/asf/camel/blob/dbd68347/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanUtil.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanUtil.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanUtil.java index f562bb2..4de6319 100644 --- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanUtil.java +++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanUtil.java @@ -106,7 +106,7 @@ public final class InfinispanUtil { return ObjectHelper.isEmpty(message.getHeader(header)); } - public static BasicCache<Object, Object> getCache(BasicCacheContainer cacheContainer, String cacheName) { + public static <K, V> BasicCache<K, V> getCache(BasicCacheContainer cacheContainer, String cacheName) { return ObjectHelper.isEmpty(cacheName) ? cacheContainer.getCache() : cacheContainer.getCache(cacheName); } http://git-wip-us.apache.org/repos/asf/camel/blob/dbd68347/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/policy/InfinispanRoutePolicy.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/policy/InfinispanRoutePolicy.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/policy/InfinispanRoutePolicy.java new file mode 100644 index 0000000..7103edf --- /dev/null +++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/policy/InfinispanRoutePolicy.java @@ -0,0 +1,522 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.infinispan.policy; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.camel.CamelContext; +import org.apache.camel.CamelContextAware; +import org.apache.camel.Route; +import org.apache.camel.RuntimeCamelException; +import org.apache.camel.Service; +import org.apache.camel.api.management.ManagedAttribute; +import org.apache.camel.api.management.ManagedResource; +import org.apache.camel.component.infinispan.InfinispanConfiguration; +import org.apache.camel.component.infinispan.InfinispanManager; +import org.apache.camel.component.infinispan.InfinispanUtil; +import org.apache.camel.support.RoutePolicySupport; +import org.apache.camel.support.ServiceSupport; +import org.apache.camel.util.ObjectHelper; +import org.apache.camel.util.StringHelper; +import org.infinispan.Cache; +import org.infinispan.client.hotrod.RemoteCache; +import org.infinispan.client.hotrod.annotation.ClientCacheEntryExpired; +import org.infinispan.client.hotrod.annotation.ClientCacheEntryRemoved; +import org.infinispan.client.hotrod.annotation.ClientListener; +import org.infinispan.client.hotrod.event.ClientCacheEntryExpiredEvent; +import org.infinispan.client.hotrod.event.ClientCacheEntryRemovedEvent; +import org.infinispan.commons.api.BasicCache; +import org.infinispan.commons.api.BasicCacheContainer; +import org.infinispan.notifications.Listener; +import org.infinispan.notifications.cachelistener.annotation.CacheEntryExpired; +import org.infinispan.notifications.cachelistener.annotation.CacheEntryRemoved; +import org.infinispan.notifications.cachelistener.event.CacheEntryEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@ManagedResource(description = "Route policy using Infinispan as clustered lock") +public class InfinispanRoutePolicy extends RoutePolicySupport implements CamelContextAware { + private static final Logger LOGGER = LoggerFactory.getLogger(InfinispanRoutePolicy.class); + + private final AtomicBoolean leader; + private final Set<Route> suspendedRoutes; + private final InfinispanManager manager; + + private Route route; + private CamelContext camelContext; + private ScheduledExecutorService executorService; + private boolean shouldStopConsumer; + private String lockMapName; + private String lockKey; + private String lockValue; + private long lifespan; + private TimeUnit lifespanTimeUnit; + private ScheduledFuture<?> future; + private Service service; + + public InfinispanRoutePolicy(InfinispanConfiguration configuration) { + this(new InfinispanManager(configuration), null, null); + } + + public InfinispanRoutePolicy(InfinispanManager manager) { + this(manager, null, null); + } + + public InfinispanRoutePolicy(InfinispanManager manager, String lockKey, String lockValue) { + this.manager = manager; + this.suspendedRoutes = new HashSet<>(); + this.leader = new AtomicBoolean(false); + this.shouldStopConsumer = true; + this.lockKey = lockKey; + this.lockValue = lockValue; + this.lifespan = 30; + this.lifespanTimeUnit = TimeUnit.SECONDS; + this.service = null; + } + + @Override + public CamelContext getCamelContext() { + return camelContext; + } + + @Override + public void setCamelContext(CamelContext camelContext) { + this.camelContext = camelContext; + } + + @Override + public void onInit(Route route) { + super.onInit(route); + this.route = route; + } + + @Override + public void onStart(Route route) { + try { + startService(); + } catch (Exception e) { + throw new RuntimeCamelException(e); + } + + if (!leader.get() && shouldStopConsumer) { + stopConsumer(route); + } + } + + @Override + public synchronized void onStop(Route route) { + try { + stopService(); + } catch (Exception e) { + throw new RuntimeCamelException(e); + } + + suspendedRoutes.remove(route); + } + + @Override + public synchronized void onSuspend(Route route) { + try { + stopService(); + } catch (Exception e) { + throw new RuntimeCamelException(e); + } + + suspendedRoutes.remove(route); + } + + @Override + protected void doStart() throws Exception { + // validate + StringHelper.notEmpty(lockMapName, "lockMapName", this); + StringHelper.notEmpty(lockKey, "lockKey", this); + StringHelper.notEmpty(lockValue, "lockValue", this); + ObjectHelper.notNull(camelContext, "camelContext", this); + + if (this.lockValue == null) { + this.lockValue = camelContext.getUuidGenerator().generateUuid(); + } + + this.manager.start(); + this.executorService = getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(this, "InfinispanRoutePolicy"); + + if (lifespanTimeUnit.convert(lifespan, TimeUnit.SECONDS) < 2) { + throw new IllegalArgumentException("Lock lifespan can not be less that 2 seconds"); + } + + BasicCache<String, String> cache = manager.getCache(lockMapName); + if (manager.isCacheContainerEmbedded()) { + this.service = new EmbeddedCacheService(InfinispanUtil.asEmbedded(cache)); + } else { + this.service = new RemoteCacheService(InfinispanUtil.asRemote(cache)); + } + + super.doStart(); + } + + @Override + protected void doStop() throws Exception { + if (future != null) { + future.cancel(true); + future = null; + } + + if (this.service != null) { + this.service.stop(); + } + + getCamelContext().getExecutorServiceManager().shutdownGraceful(executorService); + + leader.set(false); + manager.stop(); + + super.doStop(); + } + + private void startService() throws Exception { + if (service == null) { + throw new IllegalStateException("An Infinispan CacheService should be configured"); + } + + service.start(); + } + + private void stopService() throws Exception { + leader.set(false); + + if (this.service != null) { + this.service.stop(); + } + } + + // ************************************************************************* + // + // ************************************************************************* + + protected void setLeader(boolean isLeader) { + if (isLeader && leader.compareAndSet(false, isLeader)) { + LOGGER.info("Leadership taken (map={}, key={}, val={})", lockMapName, lockKey, lockValue); + + startAllStoppedConsumers(); + } else if (!isLeader && leader.getAndSet(isLeader)) { + LOGGER.info("Leadership lost (map={}, key={} val={})", lockMapName, lockKey, lockValue); + } + + if (!isLeader && this.route != null) { + stopConsumer(route); + } + } + + private synchronized void startConsumer(Route route) { + try { + if (suspendedRoutes.contains(route)) { + startConsumer(route.getConsumer()); + suspendedRoutes.remove(route); + } + } catch (Exception e) { + handleException(e); + } + } + + private synchronized void stopConsumer(Route route) { + try { + if (!suspendedRoutes.contains(route)) { + LOGGER.debug("Stopping consumer for {} ({})", route.getId(), route.getConsumer()); + stopConsumer(route.getConsumer()); + suspendedRoutes.add(route); + } + } catch (Exception e) { + handleException(e); + } + } + + private synchronized void startAllStoppedConsumers() { + try { + for (Route route : suspendedRoutes) { + LOGGER.debug("Starting consumer for {} ({})", route.getId(), route.getConsumer()); + startConsumer(route.getConsumer()); + } + + suspendedRoutes.clear(); + } catch (Exception e) { + handleException(e); + } + } + + // ************************************************************************* + // Getter/Setters + // ************************************************************************* + + @ManagedAttribute(description = "The route id") + public String getRouteId() { + if (route != null) { + return route.getId(); + } + return null; + } + + @ManagedAttribute(description = "The consumer endpoint", mask = true) + public String getEndpointUrl() { + if (route != null && route.getConsumer() != null && route.getConsumer().getEndpoint() != null) { + return route.getConsumer().getEndpoint().toString(); + } + return null; + } + + @ManagedAttribute(description = "Whether to stop consumer when starting up and failed to become master") + public boolean isShouldStopConsumer() { + return shouldStopConsumer; + } + + public void setShouldStopConsumer(boolean shouldStopConsumer) { + this.shouldStopConsumer = shouldStopConsumer; + } + + @ManagedAttribute(description = "The lock map name") + public String getLockMapName() { + return lockMapName; + } + + public void setLockMapName(String lockMapName) { + this.lockMapName = lockMapName; + } + + @ManagedAttribute(description = "The lock key") + public String getLockKey() { + return lockKey; + } + + public void setLockKey(String lockKey) { + this.lockKey = lockKey; + } + + @ManagedAttribute(description = "The lock value") + public String getLockValue() { + return lockValue; + } + + public void setLockValue(String lockValue) { + this.lockValue = lockValue; + } + + + @ManagedAttribute(description = "The key lifespan for the lock") + public long getLifespan() { + return lifespan; + } + + public void setLifespan(long lifespan) { + this.lifespan = lifespan; + } + + public void setLifespan(long lifespan, TimeUnit lifespanTimeUnit) { + this.lifespan = lifespan; + this.lifespanTimeUnit = lifespanTimeUnit; + } + + @ManagedAttribute(description = "The key lifespan time unit for the lock") + public TimeUnit getLifespanTimeUnit() { + return lifespanTimeUnit; + } + + public void setLifespanTimeUnit(TimeUnit lifespanTimeUnit) { + this.lifespanTimeUnit = lifespanTimeUnit; + } + + @ManagedAttribute(description = "Is this route the master or a slave") + public boolean isLeader() { + return leader.get(); + } + + // ************************************************************************* + // + // ************************************************************************* + + @Listener(clustered = true, sync = false) + private final class EmbeddedCacheService extends ServiceSupport implements Runnable { + private Cache<String, String> cache; + private ScheduledFuture<?> future; + + public EmbeddedCacheService(Cache<String, String> cache) { + this.cache = cache; + this.future = null; + } + + @Override + protected void doStart() throws Exception { + this.future = executorService.scheduleAtFixedRate(this::run, 0, lifespan / 2, lifespanTimeUnit); + this.cache.addListener(this); + } + + @Override + protected void doStop() throws Exception { + this.cache.removeListener(this); + this.cache.remove(lockKey, lockValue); + + if (future != null) { + future.cancel(true); + future = null; + } + } + + @Override + public void run() { + if (!isRunAllowed() || !InfinispanRoutePolicy.this.isRunAllowed()) { + return; + } + + if (isLeader()) { + // I'm still the leader, so refresh the key so it does not expire. + if (!cache.replace(lockKey, lockValue, lockValue, lifespan, lifespanTimeUnit)) { + // Looks like I've lost the leadership. + setLeader(false); + } + } + + if (!isLeader()) { + Object result = cache.putIfAbsent(lockKey, lockValue, lifespan, lifespanTimeUnit); + if (result == null) { + // Acquired the key so I'm the leader. + setLeader(true); + } else if (ObjectHelper.equal(lockValue, result) && !isLeader()) { + // Hey, I may have recovered from failure (or reboot was really + // fast) and my key was still there so yeah, I'm the leader again! + setLeader(true); + } else { + setLeader(false); + } + } + } + + @CacheEntryRemoved + public void onCacheEntryRemoved(CacheEntryEvent<Object, Object> event) { + if (ObjectHelper.equal(lockKey, event.getKey())) { + run(); + } + } + @CacheEntryExpired + public void onCacheEntryExpired(CacheEntryEvent<Object, Object> event) { + if (ObjectHelper.equal(lockKey, event.getKey())) { + run(); + } + } + } + + @ClientListener + private final class RemoteCacheService extends ServiceSupport implements Runnable { + private RemoteCache<String, String> cache; + private ScheduledFuture<?> future; + private Long version; + + public RemoteCacheService(RemoteCache<String, String> cache) { + this.cache = cache; + this.future = null; + this.version = null; + } + + @Override + protected void doStart() throws Exception { + this.future = executorService.scheduleAtFixedRate(this::run, 0, lifespan / 2, lifespanTimeUnit); + this.cache.addClientListener(this); + } + + @Override + protected void doStop() throws Exception { + this.cache.removeClientListener(this); + + if (this.version != null) { + this.cache.removeWithVersion(lockKey, this.version); + } + + if (future != null) { + future.cancel(true); + future = null; + } + } + + @Override + public void run() { + if (!isRunAllowed() || !InfinispanRoutePolicy.this.isRunAllowed()) { + return; + } + + if (isLeader() && version != null) { + LOGGER.debug("Lock refresh key={} with version={}", lockKey, version); + + // I'm still the leader, so refresh the key so it does not expire. + if (!cache.replaceWithVersion(lockKey, lockValue, version, (int)lifespanTimeUnit.toSeconds(lifespan))) { + // Looks like I've lost the leadership. + setLeader(false); + } + } + + if (!isLeader()) { + Object result = cache.putIfAbsent(lockKey, lockValue, lifespan, lifespanTimeUnit); + if (result == null) { + // Acquired the key so I'm the leader. + setLeader(true); + + // Get the version + version = cache.getWithMetadata(lockKey).getVersion(); + + LOGGER.debug("Lock acquired key={} with version={}", lockKey, version); + } else if (ObjectHelper.equal(lockValue, result) && !isLeader()) { + // Hey, I may have recovered from failure (or reboot was really + // fast) and my key was still there so yeah, I'm the leader again! + setLeader(true); + + // Get the version + version = cache.getWithMetadata(lockKey).getVersion(); + + LOGGER.debug("Lock resumed key={} with version={}", lockKey, version); + } else { + setLeader(false); + } + } + } + + @ClientCacheEntryRemoved + public void onCacheEntryRemoved(ClientCacheEntryRemovedEvent<Object> event) { + if (ObjectHelper.equal(lockKey, event.getKey())) { + run(); + } + } + + @ClientCacheEntryExpired + public void onCacheEntryExpired(ClientCacheEntryExpiredEvent<Object> event) { + if (ObjectHelper.equal(lockKey, event.getKey())) { + run(); + } + } + } + + // ************************************************************************* + // Helpers + // ************************************************************************* + + public static InfinispanRoutePolicy withManager(BasicCacheContainer cacheContainer) { + InfinispanConfiguration conf = new InfinispanConfiguration(); + conf.setCacheContainer(cacheContainer); + + return new InfinispanRoutePolicy(conf); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/dbd68347/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/policy/InfinispanEmbeddedRoutePolicyTest.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/policy/InfinispanEmbeddedRoutePolicyTest.java b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/policy/InfinispanEmbeddedRoutePolicyTest.java new file mode 100644 index 0000000..b029638 --- /dev/null +++ b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/policy/InfinispanEmbeddedRoutePolicyTest.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.infinispan.policy; + +import org.infinispan.commons.api.BasicCacheContainer; +import org.infinispan.manager.DefaultCacheManager; + +public class InfinispanEmbeddedRoutePolicyTest extends InfinispanRoutePolicyTestBase { + + @Override + protected BasicCacheContainer createCacheManager() throws Exception { + return new DefaultCacheManager(true); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/dbd68347/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/policy/InfinispanRemoteRoutePolicyTest.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/policy/InfinispanRemoteRoutePolicyTest.java b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/policy/InfinispanRemoteRoutePolicyTest.java new file mode 100644 index 0000000..5d80d77 --- /dev/null +++ b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/policy/InfinispanRemoteRoutePolicyTest.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.infinispan.policy; + +import java.util.Properties; + +import org.infinispan.client.hotrod.RemoteCacheManager; +import org.infinispan.client.hotrod.configuration.ConfigurationBuilder; +import org.infinispan.commons.api.BasicCacheContainer; +import org.junit.Ignore; + +@Ignore("Disabled as it requires a transactional cache") +public class InfinispanRemoteRoutePolicyTest extends InfinispanRoutePolicyTestBase { + + @Override + protected BasicCacheContainer createCacheManager() throws Exception { + Properties props = new Properties(); + props.setProperty("infinispan.client.hotrod.server_list", "127.0.0.1:11222"); + props.setProperty("infinispan.client.hotrod.force_return_values", "true"); + + return new RemoteCacheManager( + new ConfigurationBuilder().withProperties(props).build(), + true + ); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/dbd68347/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/policy/InfinispanRoutePolicyTestBase.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/policy/InfinispanRoutePolicyTestBase.java b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/policy/InfinispanRoutePolicyTestBase.java new file mode 100644 index 0000000..2091ed6 --- /dev/null +++ b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/policy/InfinispanRoutePolicyTestBase.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.infinispan.policy; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.infinispan.commons.api.BasicCacheContainer; +import org.junit.Assert; +import org.junit.Test; + +abstract class InfinispanRoutePolicyTestBase extends CamelTestSupport { + protected BasicCacheContainer cacheManager; + protected InfinispanRoutePolicy policy1; + protected InfinispanRoutePolicy policy2; + + @Override + protected void doPreSetup() throws Exception { + this.cacheManager = createCacheManager(); + + this.policy1 = InfinispanRoutePolicy.withManager(cacheManager); + this.policy1.setLockMapName("camel-route-policy"); + this.policy1.setLockKey("route-policy"); + this.policy1.setLockValue("route1"); + + this.policy2 = InfinispanRoutePolicy.withManager(cacheManager); + this.policy2.setLockMapName("camel-route-policy"); + this.policy2.setLockKey("route-policy"); + this.policy2.setLockValue("route2"); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + + if (this.cacheManager != null) { + this.cacheManager.stop(); + } + } + + protected abstract BasicCacheContainer createCacheManager() throws Exception; + + // ******************************************* + // + // ******************************************* + + @Test + public void testLeadership()throws Exception { + context.startRoute("route1"); + while(!policy1.isLeader()) { + Thread.sleep(250); + } + + context.startRoute("route2"); + Thread.sleep(500); + + Assert.assertTrue(policy1.isLeader()); + Assert.assertFalse(policy2.isLeader()); + + context.stopRoute("route1"); + while(!policy2.isLeader()) { + Thread.sleep(250); + } + + Assert.assertFalse(policy1.isLeader()); + Assert.assertTrue(policy2.isLeader()); + + context.startRoute("route1"); + Thread.sleep(500); + + Assert.assertFalse(policy1.isLeader()); + Assert.assertTrue(policy2.isLeader()); + + context.stopRoute("route2"); + while(!policy1.isLeader()) { + Thread.sleep(250); + } + + Assert.assertTrue(policy1.isLeader()); + Assert.assertFalse(policy2.isLeader()); + } + + // ******************************************* + // + // ******************************************* + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() { + from("direct:route1") + .routeId("route1") + .autoStartup(false) + .routePolicy(policy1) + .to("log:org.apache.camel.component.infinispan.policy.1?level=INFO&showAll=true"); + from("direct:route2") + .routeId("route2") + .autoStartup(false) + .routePolicy(policy2) + .to("log:org.apache.camel.component.infinispan.policy.2?level=INFO&showAll=true"); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/dbd68347/components/camel-infinispan/src/test/resources/log4j.xml ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/test/resources/log4j.xml b/components/camel-infinispan/src/test/resources/log4j.xml index 71a2b38..d041f18 100644 --- a/components/camel-infinispan/src/test/resources/log4j.xml +++ b/components/camel-infinispan/src/test/resources/log4j.xml @@ -64,7 +64,7 @@ <root> <priority value="INFO" /> - <appender-ref ref="FILE" /> + <appender-ref ref="CONSOLE" /> </root> </log4j:configuration>