This is an automated email from the ASF dual-hosted git repository. lburgazzoli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new a4f6087 camel-infinispan: implement route policy using clustering facilities a4f6087 is described below commit a4f6087b2a48b6dbe10621f45ad975239a374e03 Author: Luca Burgazzoli <lburgazz...@gmail.com> AuthorDate: Wed Jan 27 18:15:52 2021 +0100 camel-infinispan: implement route policy using clustering facilities --- .../camel-infinispan-common/pom.xml | 4 + .../infinispan/InfinispanRoutePolicy.java | 246 ---------------- .../camel/component/infinispan/InfinispanUtil.java | 18 ++ .../cluster/InfinispanClusterConfiguration.java | 74 +++++ .../cluster/InfinispanClusterService.java | 23 ++ .../infinispan/cluster/InfinispanClusterView.java | 117 ++++++++ .../InfinispanRoutePolicyTestSupport.java | 58 ---- .../camel-infinispan-embedded/pom.xml | 14 +- .../embedded/InfinispanEmbeddedRoutePolicy.java | 150 ---------- .../InfinispanEmbeddedClusterConfiguration.java | 60 ++++ .../cluster/InfinispanEmbeddedClusterService.java | 102 +++++++ .../cluster/InfinispanEmbeddedClusterView.java | 289 +++++++++++++++++++ .../InfinispanEmbeddedRoutePolicyTest.java | 98 ------- .../AbstractInfinispanEmbeddedClusteredTest.java | 73 +++++ .../InfinispanEmbeddedClusteredMasterTest.java | 59 ++++ ...panEmbeddedClusteredRoutePolicyFactoryTest.java | 61 ++++ ...InfinispanEmbeddedClusteredRoutePolicyTest.java | 61 ++++ .../InfinispanEmbeddedClusteredTestSupport.java | 38 +++ .../InfinispanEmbeddedClusteredViewTest.java | 72 +++++ .../src/test/resources/log4j2.properties | 4 + .../camel-infinispan/camel-infinispan/pom.xml | 14 +- .../remote/InfinispanRemoteRoutePolicy.java | 174 ------------ .../InfinispanRemoteClusterConfiguration.java | 130 +++++++++ .../cluster/InfinispanRemoteClusterService.java | 98 +++++++ .../cluster/InfinispanRemoteClusterView.java | 316 +++++++++++++++++++++ .../remote/InfinispanRemoteRoutePolicyTest.java | 124 -------- .../AbstractInfinispanRemoteClusteredTest.java | 84 ++++++ .../InfinispanRemoteClusteredMasterTest.java | 59 ++++ ...ispanRemoteClusteredRoutePolicyFactoryTest.java | 61 ++++ .../InfinispanRemoteClusteredRoutePolicyTest.java | 61 ++++ .../InfinispanRemoteClusteredTestSupport.java | 53 ++++ .../cluster/InfinispanRemoteClusteredViewTest.java | 81 ++++++ .../src/test/resources/log4j2.properties | 2 + 33 files changed, 2020 insertions(+), 858 deletions(-) diff --git a/components/camel-infinispan/camel-infinispan-common/pom.xml b/components/camel-infinispan/camel-infinispan-common/pom.xml index 3989a98..1c857bf 100644 --- a/components/camel-infinispan/camel-infinispan-common/pom.xml +++ b/components/camel-infinispan/camel-infinispan-common/pom.xml @@ -42,6 +42,10 @@ <artifactId>camel-support</artifactId> </dependency> <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-cluster</artifactId> + </dependency> + <dependency> <groupId>org.infinispan</groupId> <artifactId>infinispan-core</artifactId> <version>${infinispan-version}</version> diff --git a/components/camel-infinispan/camel-infinispan-common/src/main/java/org/apache/camel/component/infinispan/InfinispanRoutePolicy.java b/components/camel-infinispan/camel-infinispan-common/src/main/java/org/apache/camel/component/infinispan/InfinispanRoutePolicy.java deleted file mode 100644 index 93f5fe5..0000000 --- a/components/camel-infinispan/camel-infinispan-common/src/main/java/org/apache/camel/component/infinispan/InfinispanRoutePolicy.java +++ /dev/null @@ -1,246 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.infinispan; - -import java.util.HashSet; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.camel.CamelContext; -import org.apache.camel.CamelContextAware; -import org.apache.camel.Route; -import org.apache.camel.RuntimeCamelException; -import org.apache.camel.Service; -import org.apache.camel.api.management.ManagedAttribute; -import org.apache.camel.support.RoutePolicySupport; -import org.apache.camel.util.ObjectHelper; -import org.apache.camel.util.ReferenceCount; -import org.apache.camel.util.StringHelper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public abstract class InfinispanRoutePolicy extends RoutePolicySupport implements CamelContextAware { - private static final Logger LOGGER = LoggerFactory.getLogger(InfinispanRoutePolicy.class); - - private final AtomicBoolean leader; - private final Set<Route> startedRoutes; - private final Set<Route> stoppeddRoutes; - private final ReferenceCount refCount; - - private CamelContext camelContext; - private boolean shouldStopRoute; - private String lockMapName; - private String lockKey; - private String lockValue; - private long lifespan; - private TimeUnit lifespanTimeUnit; - private Service service; - - protected InfinispanRoutePolicy(String lockKey, String lockValue) { - this.stoppeddRoutes = new HashSet<>(); - this.startedRoutes = new HashSet<>(); - this.leader = new AtomicBoolean(); - this.shouldStopRoute = true; - this.lockKey = lockKey; - this.lockValue = lockValue; - this.lifespan = 30; - this.lifespanTimeUnit = TimeUnit.SECONDS; - this.service = null; - this.refCount = ReferenceCount.on(this::startService, this::stopService); - } - - @Override - public CamelContext getCamelContext() { - return camelContext; - } - - @Override - public void setCamelContext(CamelContext camelContext) { - this.camelContext = camelContext; - } - - @Override - public synchronized void onInit(Route route) { - super.onInit(route); - - LOGGER.info("Route managed by {}. Setting route {} AutoStartup flag to false.", getClass(), route.getId()); - route.setAutoStartup(false); - - stoppeddRoutes.add(route); - - this.refCount.retain(); - - startManagedRoutes(); - } - - @Override - public synchronized void doShutdown() { - this.refCount.release(); - } - - protected abstract Service createService(); - - // **************************************** - // Helpers - // **************************************** - - private void startService() { - // validate - StringHelper.notEmpty(lockMapName, "lockMapName", this); - StringHelper.notEmpty(lockKey, "lockKey", this); - StringHelper.notEmpty(lockValue, "lockValue", this); - ObjectHelper.notNull(camelContext, "camelContext", this); - - try { - if (lifespanTimeUnit.convert(lifespan, TimeUnit.SECONDS) < 2) { - throw new IllegalArgumentException("Lock lifespan can not be less that 2 seconds"); - } - - this.service = createService(); - this.service.start(); - } catch (Exception e) { - throw new RuntimeCamelException(e); - } - } - - private void stopService() { - leader.set(false); - - try { - if (this.service != null) { - this.service.stop(); - } - } catch (Exception e) { - throw new RuntimeCamelException(e); - } - } - - protected void setLeader(boolean isLeader) { - if (isLeader && leader.compareAndSet(false, isLeader)) { - LOGGER.info("Leadership taken (map={}, key={}, val={})", lockMapName, lockKey, lockValue); - startManagedRoutes(); - } else if (!isLeader && leader.getAndSet(isLeader)) { - LOGGER.info("Leadership lost (map={}, key={} val={})", lockMapName, lockKey, lockValue); - stopManagedRoutes(); - } - } - - private synchronized void startManagedRoutes() { - if (!isLeader()) { - return; - } - - try { - for (Route route : stoppeddRoutes) { - LOGGER.debug("Starting route {}", route.getId()); - startRoute(route); - startedRoutes.add(route); - } - - stoppeddRoutes.removeAll(startedRoutes); - } catch (Exception e) { - handleException(e); - } - } - - private synchronized void stopManagedRoutes() { - if (isLeader()) { - return; - } - - try { - for (Route route : startedRoutes) { - LOGGER.debug("Stopping route {}", route.getId()); - stopRoute(route); - stoppeddRoutes.add(route); - } - - startedRoutes.removeAll(stoppeddRoutes); - } catch (Exception e) { - handleException(e); - } - } - - // ************************************************************************* - // Getter/Setters - // ************************************************************************* - - @ManagedAttribute(description = "Whether to stop route when starting up and failed to become master") - public boolean isShouldStopRoute() { - return shouldStopRoute; - } - - public void setShouldStopRoute(boolean shouldStopRoute) { - this.shouldStopRoute = shouldStopRoute; - } - - @ManagedAttribute(description = "The lock map name") - public String getLockMapName() { - return lockMapName; - } - - public void setLockMapName(String lockMapName) { - this.lockMapName = lockMapName; - } - - @ManagedAttribute(description = "The lock key") - public String getLockKey() { - return lockKey; - } - - public void setLockKey(String lockKey) { - this.lockKey = lockKey; - } - - @ManagedAttribute(description = "The lock value") - public String getLockValue() { - return lockValue; - } - - public void setLockValue(String lockValue) { - this.lockValue = lockValue; - } - - @ManagedAttribute(description = "The key lifespan for the lock") - public long getLifespan() { - return lifespan; - } - - public void setLifespan(long lifespan) { - this.lifespan = lifespan; - } - - public void setLifespan(long lifespan, TimeUnit lifespanTimeUnit) { - this.lifespan = lifespan; - this.lifespanTimeUnit = lifespanTimeUnit; - } - - @ManagedAttribute(description = "The key lifespan time unit for the lock") - public TimeUnit getLifespanTimeUnit() { - return lifespanTimeUnit; - } - - public void setLifespanTimeUnit(TimeUnit lifespanTimeUnit) { - this.lifespanTimeUnit = lifespanTimeUnit; - } - - @ManagedAttribute(description = "Is this route the master or a slave") - public boolean isLeader() { - return leader.get(); - } -} diff --git a/components/camel-infinispan/camel-infinispan-common/src/main/java/org/apache/camel/component/infinispan/InfinispanUtil.java b/components/camel-infinispan/camel-infinispan-common/src/main/java/org/apache/camel/component/infinispan/InfinispanUtil.java index f9f0ebc..57edd34 100644 --- a/components/camel-infinispan/camel-infinispan-common/src/main/java/org/apache/camel/component/infinispan/InfinispanUtil.java +++ b/components/camel-infinispan/camel-infinispan-common/src/main/java/org/apache/camel/component/infinispan/InfinispanUtil.java @@ -23,6 +23,7 @@ import java.util.Properties; import java.util.concurrent.ScheduledExecutorService; import org.apache.camel.CamelContext; +import org.apache.camel.CamelContextAware; import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.support.ResourceHelper; @@ -66,4 +67,21 @@ public class InfinispanUtil { source, source.getClass().getSimpleName()); } + + public static ScheduledExecutorService newSingleThreadScheduledExecutor( + CamelContextAware camelContextAware, Object source) { + return newSingleThreadScheduledExecutor(camelContextAware.getCamelContext(), source); + } + + public static ScheduledExecutorService newSingleThreadScheduledExecutor( + CamelContext camelContext, Object source, String id) { + return camelContext.getExecutorServiceManager().newSingleThreadScheduledExecutor( + source, + source.getClass().getSimpleName() + "-" + id); + } + + public static ScheduledExecutorService newSingleThreadScheduledExecutor( + CamelContextAware camelContextAware, Object source, String id) { + return newSingleThreadScheduledExecutor(camelContextAware.getCamelContext(), source, id); + } } diff --git a/components/camel-infinispan/camel-infinispan-common/src/main/java/org/apache/camel/component/infinispan/cluster/InfinispanClusterConfiguration.java b/components/camel-infinispan/camel-infinispan-common/src/main/java/org/apache/camel/component/infinispan/cluster/InfinispanClusterConfiguration.java new file mode 100644 index 0000000..617a95a --- /dev/null +++ b/components/camel-infinispan/camel-infinispan-common/src/main/java/org/apache/camel/component/infinispan/cluster/InfinispanClusterConfiguration.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.infinispan.cluster; + +import java.util.concurrent.TimeUnit; + +import org.apache.camel.component.infinispan.InfinispanConfiguration; + +public abstract class InfinispanClusterConfiguration<C extends InfinispanConfiguration> implements Cloneable { + private final C configuration; + private long lifespan; + private TimeUnit lifespanTimeUnit; + + protected InfinispanClusterConfiguration(C configuration) { + this.configuration = configuration; + this.lifespan = 30; + this.lifespanTimeUnit = TimeUnit.SECONDS; + } + + // *********************************************** + // Properties + // *********************************************** + + public long getLifespan() { + return lifespan; + } + + public void setLifespan(long lifespan) { + this.lifespan = lifespan; + } + + public TimeUnit getLifespanTimeUnit() { + return lifespanTimeUnit; + } + + public void setLifespanTimeUnit(TimeUnit lifespanTimeUnit) { + this.lifespanTimeUnit = lifespanTimeUnit; + } + + public void setConfigurationUri(String configurationUri) { + configuration.setConfigurationUri(configurationUri); + } + + public C getConfiguration() { + return configuration; + } + + // *********************************************** + // + // *********************************************** + + @Override + public InfinispanClusterConfiguration clone() { + try { + return (InfinispanClusterConfiguration) super.clone(); + } catch (CloneNotSupportedException e) { + throw new RuntimeException(e); + } + } +} diff --git a/components/camel-infinispan/camel-infinispan-common/src/main/java/org/apache/camel/component/infinispan/cluster/InfinispanClusterService.java b/components/camel-infinispan/camel-infinispan-common/src/main/java/org/apache/camel/component/infinispan/cluster/InfinispanClusterService.java new file mode 100644 index 0000000..a6def76 --- /dev/null +++ b/components/camel-infinispan/camel-infinispan-common/src/main/java/org/apache/camel/component/infinispan/cluster/InfinispanClusterService.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.infinispan.cluster; + +import org.apache.camel.support.cluster.AbstractCamelClusterService; + +public abstract class InfinispanClusterService extends AbstractCamelClusterService<InfinispanClusterView> { + public static final String LEADER_KEY = "__camel_leader"; +} diff --git a/components/camel-infinispan/camel-infinispan-common/src/main/java/org/apache/camel/component/infinispan/cluster/InfinispanClusterView.java b/components/camel-infinispan/camel-infinispan-common/src/main/java/org/apache/camel/component/infinispan/cluster/InfinispanClusterView.java new file mode 100644 index 0000000..3a4b880 --- /dev/null +++ b/components/camel-infinispan/camel-infinispan-common/src/main/java/org/apache/camel/component/infinispan/cluster/InfinispanClusterView.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.infinispan.cluster; + +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.camel.cluster.CamelClusterMember; +import org.apache.camel.cluster.CamelClusterService; +import org.apache.camel.support.cluster.AbstractCamelClusterView; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class InfinispanClusterView extends AbstractCamelClusterView { + private static final Logger LOGGER = LoggerFactory.getLogger(InfinispanClusterView.class); + + protected InfinispanClusterView(CamelClusterService cluster, String namespace) { + super(cluster, namespace); + } + + protected abstract boolean isLeader(String id); + + // *********************************************** + // + // *********************************************** + + protected final class LocalMember implements CamelClusterMember { + private final AtomicBoolean leader = new AtomicBoolean(); + private final String id; + + public LocalMember(String id) { + this.id = id; + } + + public void setLeader(boolean master) { + if (master && this.leader.compareAndSet(false, true)) { + LOGGER.debug("Leadership taken for id: {}", id); + + fireLeadershipChangedEvent(Optional.of(this)); + return; + } + if (!master && this.leader.compareAndSet(true, false)) { + LOGGER.debug("Leadership lost for id: {}", id); + + fireLeadershipChangedEvent(getLeader()); + return; + } + } + + @Override + public boolean isLeader() { + return leader.get(); + } + + @Override + public boolean isLocal() { + return true; + } + + @Override + public String getId() { + return this.id; + } + + @Override + public String toString() { + return "LocalMember{" + "leader=" + leader + '}'; + } + } + + protected final class ClusterMember implements CamelClusterMember { + private final String id; + + public ClusterMember(String id) { + this.id = id; + } + + @Override + public String getId() { + return id; + } + + @Override + public boolean isLeader() { + return InfinispanClusterView.this.isLeader(id); + } + + @Override + public boolean isLocal() { + if (id == null) { + return false; + } + + return Objects.equals(id, getLocalMember().getId()); + } + + @Override + public String toString() { + return "ClusterMember{" + "id='" + id + '\'' + '}'; + } + } +} diff --git a/components/camel-infinispan/camel-infinispan-common/src/test/java/org/apache/camel/component/infinispan/InfinispanRoutePolicyTestSupport.java b/components/camel-infinispan/camel-infinispan-common/src/test/java/org/apache/camel/component/infinispan/InfinispanRoutePolicyTestSupport.java deleted file mode 100644 index ba5b6df..0000000 --- a/components/camel-infinispan/camel-infinispan-common/src/test/java/org/apache/camel/component/infinispan/InfinispanRoutePolicyTestSupport.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.infinispan; - -import java.util.concurrent.TimeUnit; - -import org.apache.camel.CamelContext; -import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.impl.DefaultCamelContext; -import org.junit.jupiter.api.Test; - -import static org.awaitility.Awaitility.await; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; - -public interface InfinispanRoutePolicyTestSupport { - InfinispanRoutePolicy createRoutePolicy(String lockValue); - - @Test - default void testLeadership() throws Exception { - final InfinispanRoutePolicy policy1 = createRoutePolicy("route1"); - final InfinispanRoutePolicy policy2 = createRoutePolicy("route2"); - - try (CamelContext context = new DefaultCamelContext()) { - context.start(); - - RouteBuilder.addRoutes(context, b -> b.from("direct:r1").routePolicy(policy1).to("mock:p1")); - - await().atMost(10, TimeUnit.SECONDS).until(policy1::isLeader); - - RouteBuilder.addRoutes(context, b -> b.from("direct:r2").routePolicy(policy2).to("mock:p2")); - - assertTrue(policy1.isLeader()); - assertFalse(policy2.isLeader()); - - policy1.shutdown(); - - await().atMost(10, TimeUnit.SECONDS).until(policy2::isLeader); - - assertFalse(policy1.isLeader()); - assertTrue(policy2.isLeader()); - } - } -} diff --git a/components/camel-infinispan/camel-infinispan-embedded/pom.xml b/components/camel-infinispan/camel-infinispan-embedded/pom.xml index 34ed98f..b8a7bc4 100644 --- a/components/camel-infinispan/camel-infinispan-embedded/pom.xml +++ b/components/camel-infinispan/camel-infinispan-embedded/pom.xml @@ -35,10 +35,6 @@ <dependencies> <dependency> <groupId>org.apache.camel</groupId> - <artifactId>camel-support</artifactId> - </dependency> - <dependency> - <groupId>org.apache.camel</groupId> <artifactId>camel-infinispan-common</artifactId> </dependency> <dependency> @@ -65,6 +61,11 @@ <artifactId>camel-test-spring-junit5</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-master</artifactId> + <scope>test</scope> + </dependency> <!-- testing - infinispan --> <dependency> @@ -114,6 +115,11 @@ <artifactId>awaitility</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.assertj</groupId> + <artifactId>assertj-core</artifactId> + <scope>test</scope> + </dependency> </dependencies> <build> diff --git a/components/camel-infinispan/camel-infinispan-embedded/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanEmbeddedRoutePolicy.java b/components/camel-infinispan/camel-infinispan-embedded/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanEmbeddedRoutePolicy.java deleted file mode 100644 index 615a77c..0000000 --- a/components/camel-infinispan/camel-infinispan-embedded/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanEmbeddedRoutePolicy.java +++ /dev/null @@ -1,150 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.infinispan.embedded; - -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; - -import org.apache.camel.Service; -import org.apache.camel.api.management.ManagedResource; -import org.apache.camel.component.infinispan.InfinispanRoutePolicy; -import org.apache.camel.support.service.ServiceSupport; -import org.apache.camel.util.ObjectHelper; -import org.infinispan.Cache; -import org.infinispan.notifications.Listener; -import org.infinispan.notifications.cachelistener.annotation.CacheEntryExpired; -import org.infinispan.notifications.cachelistener.annotation.CacheEntryRemoved; -import org.infinispan.notifications.cachelistener.event.CacheEntryEvent; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -@ManagedResource(description = "Route policy using Infinispan Embedded as clustered lock") -public class InfinispanEmbeddedRoutePolicy extends InfinispanRoutePolicy { - private static final Logger LOGGER = LoggerFactory.getLogger(InfinispanEmbeddedRoutePolicy.class); - - private final InfinispanEmbeddedManager manager; - - public InfinispanEmbeddedRoutePolicy(InfinispanEmbeddedConfiguration configuration) { - this(configuration, null, null); - } - - public InfinispanEmbeddedRoutePolicy(InfinispanEmbeddedConfiguration configuration, String lockKey, String lockValue) { - super(lockKey, lockValue); - - this.manager = new InfinispanEmbeddedManager(configuration); - } - - @Override - protected Service createService() { - return new EmbeddedCacheService(); - } - - @Override - public void doStart() throws Exception { - super.doStart(); - this.manager.start(); - } - - @Override - public void doStop() throws Exception { - super.doStop(); - this.manager.stop(); - } - - // ************************************************************************* - // - // ************************************************************************* - - @Listener(clustered = true, sync = false) - private final class EmbeddedCacheService extends ServiceSupport { - private Cache<String, String> cache; - private ScheduledExecutorService executorService; - private ScheduledFuture<?> future; - - EmbeddedCacheService() { - } - - @SuppressWarnings("unchecked") - @Override - protected void doStart() throws Exception { - this.executorService = getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(this, - getClass().getSimpleName()); - this.cache = manager.getCache(getLockMapName(), Cache.class); - this.cache.addListener(this); - this.future = executorService.scheduleAtFixedRate(this::run, 0, getLifespan() / 2, getLifespanTimeUnit()); - } - - @Override - protected void doStop() throws Exception { - if (cache != null) { - this.cache.removeListener(this); - this.cache.remove(getLockKey(), getLockValue()); - } - - if (future != null) { - future.cancel(true); - future = null; - } - - getCamelContext().getExecutorServiceManager().shutdownGraceful(executorService); - } - - private void run() { - if (!isRunAllowed()) { - return; - } - - if (isLeader()) { - // I'm still the leader, so refresh the key so it does not expire. - if (!cache.replace(getLockKey(), getLockValue(), getLockValue(), getLifespan(), getLifespanTimeUnit())) { - // Looks like I've lost the leadership. - setLeader(false); - } - } - - if (!isLeader()) { - Object result = cache.putIfAbsent(getLockKey(), getLockValue(), getLifespan(), getLifespanTimeUnit()); - if (result == null) { - // Acquired the key so I'm the leader. - setLeader(true); - } else if (ObjectHelper.equal(getLockValue(), result) && !isLeader()) { - // Hey, I may have recovered from failure (or reboot was really - // fast) and my key was still there so yeah, I'm the leader again! - setLeader(true); - } else { - setLeader(false); - } - } - } - - @CacheEntryRemoved - public void onCacheEntryRemoved(CacheEntryEvent<Object, Object> event) { - LOGGER.debug("onCacheEntryExpired lock-key={}, event-key={}", getLockKey(), event.getKey()); - if (ObjectHelper.equal(getLockKey(), event.getKey())) { - run(); - } - } - - @CacheEntryExpired - public void onCacheEntryExpired(CacheEntryEvent<Object, Object> event) { - LOGGER.debug("onCacheEntryExpired lock-key={}, event-key={}", getLockKey(), event.getKey()); - if (ObjectHelper.equal(getLockKey(), event.getKey())) { - run(); - } - } - } -} diff --git a/components/camel-infinispan/camel-infinispan-embedded/src/main/java/org/apache/camel/component/infinispan/embedded/cluster/InfinispanEmbeddedClusterConfiguration.java b/components/camel-infinispan/camel-infinispan-embedded/src/main/java/org/apache/camel/component/infinispan/embedded/cluster/InfinispanEmbeddedClusterConfiguration.java new file mode 100644 index 0000000..45a698c --- /dev/null +++ b/components/camel-infinispan/camel-infinispan-embedded/src/main/java/org/apache/camel/component/infinispan/embedded/cluster/InfinispanEmbeddedClusterConfiguration.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.infinispan.embedded.cluster; + +import org.apache.camel.component.infinispan.cluster.InfinispanClusterConfiguration; +import org.apache.camel.component.infinispan.embedded.InfinispanEmbeddedConfiguration; +import org.infinispan.configuration.cache.Configuration; +import org.infinispan.manager.EmbeddedCacheManager; + +public class InfinispanEmbeddedClusterConfiguration + extends InfinispanClusterConfiguration<InfinispanEmbeddedConfiguration> + implements Cloneable { + + public InfinispanEmbeddedClusterConfiguration() { + super(new InfinispanEmbeddedConfiguration()); + } + + // *********************************************** + // Properties + // *********************************************** + + public EmbeddedCacheManager getCacheContainer() { + return getConfiguration().getCacheContainer(); + } + + public void setCacheContainer(EmbeddedCacheManager cacheContainer) { + getConfiguration().setCacheContainer(cacheContainer); + } + + public Configuration getCacheContainerConfiguration() { + return getConfiguration().getCacheContainerConfiguration(); + } + + public void setCacheContainerConfiguration(Configuration cacheContainerConfiguration) { + getConfiguration().setCacheContainerConfiguration(cacheContainerConfiguration); + } + + // *********************************************** + // + // *********************************************** + + @Override + public InfinispanEmbeddedClusterConfiguration clone() { + return (InfinispanEmbeddedClusterConfiguration) super.clone(); + } +} diff --git a/components/camel-infinispan/camel-infinispan-embedded/src/main/java/org/apache/camel/component/infinispan/embedded/cluster/InfinispanEmbeddedClusterService.java b/components/camel-infinispan/camel-infinispan-embedded/src/main/java/org/apache/camel/component/infinispan/embedded/cluster/InfinispanEmbeddedClusterService.java new file mode 100644 index 0000000..d471d20 --- /dev/null +++ b/components/camel-infinispan/camel-infinispan-embedded/src/main/java/org/apache/camel/component/infinispan/embedded/cluster/InfinispanEmbeddedClusterService.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.infinispan.embedded.cluster; + +import java.util.concurrent.TimeUnit; + +import org.apache.camel.component.infinispan.cluster.InfinispanClusterService; +import org.apache.camel.component.infinispan.cluster.InfinispanClusterView; +import org.apache.camel.util.ObjectHelper; +import org.infinispan.configuration.cache.Configuration; +import org.infinispan.manager.EmbeddedCacheManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class InfinispanEmbeddedClusterService extends InfinispanClusterService { + private static final Logger LOGGER = LoggerFactory.getLogger(InfinispanEmbeddedClusterService.class); + + private InfinispanEmbeddedClusterConfiguration configuration; + + public InfinispanEmbeddedClusterService() { + this.configuration = new InfinispanEmbeddedClusterConfiguration(); + } + + public InfinispanEmbeddedClusterService(InfinispanEmbeddedClusterConfiguration configuration) { + this.configuration = configuration.clone(); + } + + // ********************************************* + // Properties + // ********************************************* + + public InfinispanEmbeddedClusterConfiguration getConfiguration() { + return configuration; + } + + public void setConfiguration(InfinispanEmbeddedClusterConfiguration configuration) { + this.configuration = configuration.clone(); + } + + public void setConfigurationUri(String configurationUri) { + configuration.setConfigurationUri(configurationUri); + } + + public EmbeddedCacheManager getCacheContainer() { + return configuration.getCacheContainer(); + } + + public void setCacheContainer(EmbeddedCacheManager cacheContainer) { + configuration.setCacheContainer(cacheContainer); + } + + public Configuration getCacheContainerConfiguration() { + return configuration.getCacheContainerConfiguration(); + } + + public void setCacheContainerConfiguration(Configuration cacheContainerConfiguration) { + configuration.setCacheContainerConfiguration(cacheContainerConfiguration); + } + + public long getLifespan() { + return configuration.getLifespan(); + } + + public void setLifespan(long lifespan) { + configuration.setLifespan(lifespan); + } + + public TimeUnit getLifespanTimeUnit() { + return configuration.getLifespanTimeUnit(); + } + + public void setLifespanTimeUnit(TimeUnit lifespanTimeUnit) { + configuration.setLifespanTimeUnit(lifespanTimeUnit); + } + + // ********************************************* + // Impl + // ********************************************* + + @Override + protected InfinispanClusterView createView(String namespace) throws Exception { + // Validate parameters + ObjectHelper.notNull(getCamelContext(), "Camel Context"); + ObjectHelper.notNull(getId(), "Cluster ID"); + + return new InfinispanEmbeddedClusterView(this, configuration, namespace); + } +} diff --git a/components/camel-infinispan/camel-infinispan-embedded/src/main/java/org/apache/camel/component/infinispan/embedded/cluster/InfinispanEmbeddedClusterView.java b/components/camel-infinispan/camel-infinispan-embedded/src/main/java/org/apache/camel/component/infinispan/embedded/cluster/InfinispanEmbeddedClusterView.java new file mode 100644 index 0000000..2d09cf9 --- /dev/null +++ b/components/camel-infinispan/camel-infinispan-embedded/src/main/java/org/apache/camel/component/infinispan/embedded/cluster/InfinispanEmbeddedClusterView.java @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.infinispan.embedded.cluster; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; + +import org.apache.camel.cluster.CamelClusterMember; +import org.apache.camel.component.infinispan.InfinispanUtil; +import org.apache.camel.component.infinispan.cluster.InfinispanClusterService; +import org.apache.camel.component.infinispan.cluster.InfinispanClusterView; +import org.apache.camel.component.infinispan.embedded.InfinispanEmbeddedManager; +import org.apache.camel.support.service.ServiceHelper; +import org.apache.camel.support.service.ServiceSupport; +import org.infinispan.Cache; +import org.infinispan.notifications.Listener; +import org.infinispan.notifications.cachelistener.annotation.CacheEntryExpired; +import org.infinispan.notifications.cachelistener.annotation.CacheEntryRemoved; +import org.infinispan.notifications.cachelistener.event.CacheEntryEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.camel.util.function.Predicates.negate; + +public class InfinispanEmbeddedClusterView extends InfinispanClusterView { + private static final Logger LOGGER = LoggerFactory.getLogger(InfinispanEmbeddedClusterService.class); + + private final InfinispanEmbeddedClusterConfiguration configuration; + private final InfinispanEmbeddedManager manager; + private final LocalMember localMember; + private final LeadershipService leadership; + + private Cache<String, String> cache; + + protected InfinispanEmbeddedClusterView( + InfinispanEmbeddedClusterService cluster, + InfinispanEmbeddedClusterConfiguration configuration, + String namespace) { + super(cluster, namespace); + + this.configuration = configuration; + this.manager = new InfinispanEmbeddedManager(this.configuration.getConfiguration()); + this.leadership = new LeadershipService(); + this.localMember = new LocalMember(cluster.getId()); + } + + @SuppressWarnings("unchecked") + @Override + public void doStart() throws Exception { + super.doStart(); + + ServiceHelper.startService(manager); + + this.cache = manager.getCache(getNamespace(), Cache.class); + + ServiceHelper.startService(leadership); + } + + @Override + public void doStop() throws Exception { + super.doStop(); + + ServiceHelper.stopService(leadership); + ServiceHelper.stopService(manager); + + this.cache = null; + } + + @Override + public CamelClusterMember getLocalMember() { + return this.localMember; + } + + @Override + public List<CamelClusterMember> getMembers() { + return this.cache != null + ? cache.keySet().stream() + .filter(negate(InfinispanClusterService.LEADER_KEY::equals)) + .map(ClusterMember::new) + .collect(Collectors.toList()) + : Collections.emptyList(); + } + + @Override + public Optional<CamelClusterMember> getLeader() { + if (this.cache == null) { + return Optional.empty(); + } + + String id = cache.get(InfinispanClusterService.LEADER_KEY); + if (id == null) { + return Optional.empty(); + } + + return Optional.of(new ClusterMember(id)); + } + + @Override + protected boolean isLeader(String id) { + if (this.cache == null) { + return false; + } + if (id == null) { + return false; + } + + final String key = InfinispanClusterService.LEADER_KEY; + final String val = this.cache.get(key); + + return Objects.equals(id, val); + } + + // ***************************************** + // + // Service + // + // ***************************************** + + @Listener(clustered = true, sync = false) + private final class LeadershipService extends ServiceSupport { + private final AtomicBoolean running; + private ScheduledExecutorService executorService; + + LeadershipService() { + this.running = new AtomicBoolean(false); + } + + @Override + protected void doStart() throws Exception { + super.doStart(); + + this.running.set(true); + this.executorService = InfinispanUtil.newSingleThreadScheduledExecutor( + getCamelContext(), + this, + getLocalMember().getId()); + + // register the local member to the inventory + cache.put( + getLocalMember().getId(), + "false", + configuration.getLifespan(), + configuration.getLifespanTimeUnit()); + + cache.addListener(this); + + executorService.scheduleAtFixedRate( + this::run, + 0, + configuration.getLifespan() / 2, + configuration.getLifespanTimeUnit()); + } + + @Override + protected void doStop() throws Exception { + super.doStop(); + + this.running.set(false); + + if (cache != null) { + cache.removeListener(this); + } + + getCamelContext().getExecutorServiceManager().shutdownGraceful(executorService); + + if (cache != null) { + cache.remove(InfinispanClusterService.LEADER_KEY, getClusterService().getId()); + + LOGGER.info("Removing local member, key={}", getLocalMember().getId()); + cache.remove(getLocalMember().getId()); + } + } + + private boolean isLeader() { + return getLocalMember().isLeader(); + } + + private void setLeader(boolean leader) { + ((LocalMember) getLocalMember()).setLeader(leader); + } + + private synchronized void run() { + if (!running.get()) { + return; + } + + final String leaderKey = InfinispanClusterService.LEADER_KEY; + final String localId = getLocalMember().getId(); + + if (isLeader()) { + LOGGER.debug("Lock refresh key={}, id{}", leaderKey, localId); + + // I'm still the leader, so refresh the key so it does not expire. + if (!cache.replace( + InfinispanClusterService.LEADER_KEY, + getClusterService().getId(), + getClusterService().getId(), + configuration.getLifespan(), + configuration.getLifespanTimeUnit())) { + + LOGGER.debug("Failed to refresh the lock key={}, id={}", leaderKey, localId); + + // Looks like I've lost the leadership. + setLeader(false); + } + } + if (!isLeader()) { + LOGGER.debug("Try to acquire lock key={}, id={}", leaderKey, localId); + + Object result = cache.putIfAbsent( + InfinispanClusterService.LEADER_KEY, + getClusterService().getId(), + configuration.getLifespan(), + configuration.getLifespanTimeUnit()); + if (result == null) { + LOGGER.debug("Lock acquired key={}, id={}", leaderKey, localId); + // Acquired the key so I'm the leader. + setLeader(true); + } else if (Objects.equals(getClusterService().getId(), result) && !isLeader()) { + LOGGER.debug("Lock resumed key={}, id={}", leaderKey, localId); + // Hey, I may have recovered from failure (or reboot was really + // fast) and my key was still there so yeah, I'm the leader again! + setLeader(true); + } else { + LOGGER.debug("Failed to acquire the lock key={}, id={}", leaderKey, localId); + setLeader(false); + } + } + + // refresh local membership + cache.put( + getLocalMember().getId(), + isLeader() ? "true" : "false", + configuration.getLifespan(), + configuration.getLifespanTimeUnit()); + } + + @CacheEntryRemoved + public void onCacheEntryRemoved(CacheEntryEvent<Object, Object> event) { + if (!running.get()) { + return; + } + + LOGGER.debug("onCacheEntryRemoved id={}, lock-key={}, event-key={}", + getLocalMember().getId(), + InfinispanClusterService.LEADER_KEY, + event.getKey()); + + if (Objects.equals(InfinispanClusterService.LEADER_KEY, event.getKey())) { + executorService.execute(this::run); + } + } + + @CacheEntryExpired + public void onCacheEntryExpired(CacheEntryEvent<Object, Object> event) { + if (!running.get()) { + return; + } + + LOGGER.debug("onCacheEntryExpired id={}, lock-key={}, event-key={}", + getLocalMember().getId(), + InfinispanClusterService.LEADER_KEY, + event.getKey()); + + if (Objects.equals(InfinispanClusterService.LEADER_KEY, event.getKey())) { + executorService.execute(this::run); + } + } + } +} diff --git a/components/camel-infinispan/camel-infinispan-embedded/src/test/java/org/apache/camel/component/infinispan/embedded/InfinispanEmbeddedRoutePolicyTest.java b/components/camel-infinispan/camel-infinispan-embedded/src/test/java/org/apache/camel/component/infinispan/embedded/InfinispanEmbeddedRoutePolicyTest.java deleted file mode 100644 index 9c02b7a..0000000 --- a/components/camel-infinispan/camel-infinispan-embedded/src/test/java/org/apache/camel/component/infinispan/embedded/InfinispanEmbeddedRoutePolicyTest.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.infinispan.embedded; - -import java.util.concurrent.TimeUnit; - -import org.apache.camel.CamelContext; -import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.component.infinispan.InfinispanRoutePolicy; -import org.apache.camel.impl.DefaultCamelContext; -import org.infinispan.commons.api.CacheContainerAdmin; -import org.infinispan.configuration.cache.CacheMode; -import org.infinispan.configuration.cache.ConfigurationBuilder; -import org.infinispan.manager.DefaultCacheManager; -import org.infinispan.manager.EmbeddedCacheManager; -import org.junit.jupiter.api.Test; - -import static org.awaitility.Awaitility.await; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; - -public class InfinispanEmbeddedRoutePolicyTest { - public static final String CACHE_NAME = "_route_policy"; - - @Test - public void testLeadership() throws Exception { - try (EmbeddedCacheManager cacheContainer = createCacheContainer()) { - - cacheContainer.start(); - - final InfinispanRoutePolicy policy1 = createRoutePolicy(cacheContainer, "route1"); - final InfinispanRoutePolicy policy2 = createRoutePolicy(cacheContainer, "route2"); - - try (CamelContext context = new DefaultCamelContext()) { - context.start(); - - RouteBuilder.addRoutes(context, b -> b.from("direct:r1").routePolicy(policy1).to("mock:p1")); - - await().atMost(10, TimeUnit.SECONDS).until(policy1::isLeader); - - RouteBuilder.addRoutes(context, b -> b.from("direct:r2").routePolicy(policy2).to("mock:p2")); - - assertTrue(policy1.isLeader()); - assertFalse(policy2.isLeader()); - - policy1.shutdown(); - - await().atMost(10, TimeUnit.SECONDS).until(policy2::isLeader); - - assertFalse(policy1.isLeader()); - assertTrue(policy2.isLeader()); - } - } - } - - // ***************************** - // - // ***************************** - - private static EmbeddedCacheManager createCacheContainer() { - DefaultCacheManager cacheContainer = new DefaultCacheManager(); - cacheContainer.administration() - .withFlags(CacheContainerAdmin.AdminFlag.VOLATILE) - .getOrCreateCache( - CACHE_NAME, - new ConfigurationBuilder() - .clustering().cacheMode(CacheMode.LOCAL) - .build()); - - return cacheContainer; - } - - private static InfinispanRoutePolicy createRoutePolicy(EmbeddedCacheManager cacheContainer, String lockValue) { - InfinispanEmbeddedConfiguration configuration = new InfinispanEmbeddedConfiguration(); - configuration.setCacheContainer(cacheContainer); - - InfinispanEmbeddedRoutePolicy policy = new InfinispanEmbeddedRoutePolicy(configuration); - policy.setLockMapName(CACHE_NAME); - policy.setLockKey("lock-key"); - policy.setLockValue(lockValue); - - return policy; - } -} diff --git a/components/camel-infinispan/camel-infinispan-embedded/src/test/java/org/apache/camel/component/infinispan/embedded/cluster/AbstractInfinispanEmbeddedClusteredTest.java b/components/camel-infinispan/camel-infinispan-embedded/src/test/java/org/apache/camel/component/infinispan/embedded/cluster/AbstractInfinispanEmbeddedClusteredTest.java new file mode 100644 index 0000000..4b1aaff --- /dev/null +++ b/components/camel-infinispan/camel-infinispan-embedded/src/test/java/org/apache/camel/component/infinispan/embedded/cluster/AbstractInfinispanEmbeddedClusteredTest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.infinispan.embedded.cluster; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.infinispan.manager.DefaultCacheManager; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.assertj.core.api.Assertions.assertThat; + +abstract class AbstractInfinispanEmbeddedClusteredTest { + @Timeout(value = 1, unit = TimeUnit.MINUTES) + @Test + public void test() throws Exception { + final Logger logger = LoggerFactory.getLogger(getClass()); + final List<String> clients = IntStream.range(0, 3).mapToObj(Integer::toString).collect(Collectors.toList()); + final List<String> results = new ArrayList<>(); + final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(clients.size() * 2); + final CountDownLatch latch = new CountDownLatch(clients.size()); + final String viewName = "myView"; + + try (DefaultCacheManager cacheContainer = new DefaultCacheManager()) { + InfinispanEmbeddedClusteredTestSupport.createCache(cacheContainer, viewName); + + for (String id : clients) { + scheduler.submit(() -> { + try { + run(cacheContainer, viewName, id); + logger.debug("Node {} is shutting down", id); + results.add(id); + } catch (Exception e) { + logger.warn("", e); + } finally { + latch.countDown(); + } + }); + } + + latch.await(); + scheduler.shutdownNow(); + + assertThat(results).hasSameSizeAs(clients); + assertThat(results).containsAll(clients); + } + } + + protected abstract void run(DefaultCacheManager cacheContainer, String namespace, String id) throws Exception; +} diff --git a/components/camel-infinispan/camel-infinispan-embedded/src/test/java/org/apache/camel/component/infinispan/embedded/cluster/InfinispanEmbeddedClusteredMasterTest.java b/components/camel-infinispan/camel-infinispan-embedded/src/test/java/org/apache/camel/component/infinispan/embedded/cluster/InfinispanEmbeddedClusteredMasterTest.java new file mode 100644 index 0000000..37372d4 --- /dev/null +++ b/components/camel-infinispan/camel-infinispan-embedded/src/test/java/org/apache/camel/component/infinispan/embedded/cluster/InfinispanEmbeddedClusteredMasterTest.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.infinispan.embedded.cluster; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadLocalRandom; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.impl.DefaultCamelContext; +import org.infinispan.manager.DefaultCacheManager; + +public class InfinispanEmbeddedClusteredMasterTest extends AbstractInfinispanEmbeddedClusteredTest { + @Override + protected void run(DefaultCacheManager cacheContainer, String namespace, String id) throws Exception { + int events = ThreadLocalRandom.current().nextInt(2, 6); + CountDownLatch contextLatch = new CountDownLatch(events); + + //Set up a single node cluster. + InfinispanEmbeddedClusterService clusterService = new InfinispanEmbeddedClusterService(); + clusterService.setCacheContainer(cacheContainer); + clusterService.setId("node-" + id); + + try (DefaultCamelContext context = new DefaultCamelContext()) { + context.disableJMX(); + context.setName("context-" + id); + context.addService(clusterService); + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + fromF("master:%s:timer:%s?delay=1000&period=1000", namespace, id) + .routeId("route-" + id) + .log("From ${routeId}") + .process(e -> contextLatch.countDown()); + } + }); + + // Start the context after some random time so the startup order + // changes for each test. + Thread.sleep(ThreadLocalRandom.current().nextInt(500)); + context.start(); + + contextLatch.await(); + } + } +} diff --git a/components/camel-infinispan/camel-infinispan-embedded/src/test/java/org/apache/camel/component/infinispan/embedded/cluster/InfinispanEmbeddedClusteredRoutePolicyFactoryTest.java b/components/camel-infinispan/camel-infinispan-embedded/src/test/java/org/apache/camel/component/infinispan/embedded/cluster/InfinispanEmbeddedClusteredRoutePolicyFactoryTest.java new file mode 100644 index 0000000..08d611d --- /dev/null +++ b/components/camel-infinispan/camel-infinispan-embedded/src/test/java/org/apache/camel/component/infinispan/embedded/cluster/InfinispanEmbeddedClusteredRoutePolicyFactoryTest.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.infinispan.embedded.cluster; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadLocalRandom; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.impl.cluster.ClusteredRoutePolicyFactory; +import org.infinispan.manager.DefaultCacheManager; + +public class InfinispanEmbeddedClusteredRoutePolicyFactoryTest extends AbstractInfinispanEmbeddedClusteredTest { + @Override + protected void run(DefaultCacheManager cacheContainer, String namespace, String id) throws Exception { + int events = ThreadLocalRandom.current().nextInt(2, 6); + CountDownLatch contextLatch = new CountDownLatch(events); + + //Set up a single node cluster. + InfinispanEmbeddedClusterService clusterService = new InfinispanEmbeddedClusterService(); + clusterService.setCacheContainer(cacheContainer); + clusterService.setId("node-" + id); + + try (DefaultCamelContext context = new DefaultCamelContext()) { + context.disableJMX(); + context.setName("context-" + id); + context.addService(clusterService); + context.addRoutePolicyFactory(ClusteredRoutePolicyFactory.forNamespace(namespace)); + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + fromF("timer:%s?delay=1000&period=1000", id) + .routeId("route-" + id) + .log("From ${routeId}") + .process(e -> contextLatch.countDown()); + } + }); + + // Start the context after some random time so the startup order + // changes for each test. + Thread.sleep(ThreadLocalRandom.current().nextInt(500)); + context.start(); + + contextLatch.await(); + } + } +} diff --git a/components/camel-infinispan/camel-infinispan-embedded/src/test/java/org/apache/camel/component/infinispan/embedded/cluster/InfinispanEmbeddedClusteredRoutePolicyTest.java b/components/camel-infinispan/camel-infinispan-embedded/src/test/java/org/apache/camel/component/infinispan/embedded/cluster/InfinispanEmbeddedClusteredRoutePolicyTest.java new file mode 100644 index 0000000..010c0ce --- /dev/null +++ b/components/camel-infinispan/camel-infinispan-embedded/src/test/java/org/apache/camel/component/infinispan/embedded/cluster/InfinispanEmbeddedClusteredRoutePolicyTest.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.infinispan.embedded.cluster; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadLocalRandom; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.impl.cluster.ClusteredRoutePolicy; +import org.infinispan.manager.DefaultCacheManager; + +public class InfinispanEmbeddedClusteredRoutePolicyTest extends AbstractInfinispanEmbeddedClusteredTest { + @Override + protected void run(DefaultCacheManager cacheContainer, String namespace, String id) throws Exception { + int events = ThreadLocalRandom.current().nextInt(2, 6); + CountDownLatch contextLatch = new CountDownLatch(events); + + //Set up a single node cluster. + InfinispanEmbeddedClusterService clusterService = new InfinispanEmbeddedClusterService(); + clusterService.setCacheContainer(cacheContainer); + clusterService.setId("node-" + id); + + try (DefaultCamelContext context = new DefaultCamelContext()) { + context.disableJMX(); + context.setName("context-" + id); + context.addService(clusterService); + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + fromF("timer:%s?delay=1000&period=1000", id) + .routeId("route-" + id) + .routePolicy(ClusteredRoutePolicy.forNamespace(namespace)) + .log("From ${routeId}") + .process(e -> contextLatch.countDown()); + } + }); + + // Start the context after some random time so the startup order + // changes for each test. + Thread.sleep(ThreadLocalRandom.current().nextInt(500)); + context.start(); + + contextLatch.await(); + } + } +} diff --git a/components/camel-infinispan/camel-infinispan-embedded/src/test/java/org/apache/camel/component/infinispan/embedded/cluster/InfinispanEmbeddedClusteredTestSupport.java b/components/camel-infinispan/camel-infinispan-embedded/src/test/java/org/apache/camel/component/infinispan/embedded/cluster/InfinispanEmbeddedClusteredTestSupport.java new file mode 100644 index 0000000..bc8dbde --- /dev/null +++ b/components/camel-infinispan/camel-infinispan-embedded/src/test/java/org/apache/camel/component/infinispan/embedded/cluster/InfinispanEmbeddedClusteredTestSupport.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.infinispan.embedded.cluster; + +import org.infinispan.commons.api.CacheContainerAdmin; +import org.infinispan.configuration.cache.CacheMode; +import org.infinispan.configuration.cache.ConfigurationBuilder; +import org.infinispan.manager.DefaultCacheManager; + +public final class InfinispanEmbeddedClusteredTestSupport { + private InfinispanEmbeddedClusteredTestSupport() { + } + + public static void createCache(DefaultCacheManager cacheContainer, String cacheName) { + cacheContainer.administration() + .withFlags(CacheContainerAdmin.AdminFlag.VOLATILE) + .getOrCreateCache( + cacheName, + new ConfigurationBuilder() + .clustering() + .cacheMode(CacheMode.LOCAL) + .build()); + } +} diff --git a/components/camel-infinispan/camel-infinispan-embedded/src/test/java/org/apache/camel/component/infinispan/embedded/cluster/InfinispanEmbeddedClusteredViewTest.java b/components/camel-infinispan/camel-infinispan-embedded/src/test/java/org/apache/camel/component/infinispan/embedded/cluster/InfinispanEmbeddedClusteredViewTest.java new file mode 100644 index 0000000..d15efc6 --- /dev/null +++ b/components/camel-infinispan/camel-infinispan-embedded/src/test/java/org/apache/camel/component/infinispan/embedded/cluster/InfinispanEmbeddedClusteredViewTest.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.infinispan.embedded.cluster; + +import java.util.concurrent.TimeUnit; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.cluster.CamelClusterMember; +import org.apache.camel.cluster.CamelClusterView; +import org.apache.camel.impl.DefaultCamelContext; +import org.infinispan.manager.DefaultCacheManager; +import org.junit.jupiter.api.Test; + +import static org.apache.camel.component.infinispan.embedded.cluster.InfinispanEmbeddedClusteredTestSupport.createCache; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +public class InfinispanEmbeddedClusteredViewTest { + @Test + public void getLeaderTest() throws Exception { + final String viewName = "myView"; + + try (DefaultCacheManager cacheContainer = new DefaultCacheManager()) { + createCache(cacheContainer, viewName); + + //Set up a single node cluster. + InfinispanEmbeddedClusterService clusterService = new InfinispanEmbeddedClusterService(); + clusterService.setCacheContainer(cacheContainer); + clusterService.setId("node"); + + //Set up context with single locked route. + try (DefaultCamelContext context = new DefaultCamelContext()) { + context.disableJMX(); + context.addService(clusterService); + + context.addRoutes(new RouteBuilder() { + @Override + public void configure() { + fromF("master:%s:timer:infinispan?repeatCount=1", viewName) + .routeId("route1") + .stop(); + } + }); + + context.start(); + + CamelClusterView view = clusterService.getView(viewName); + + await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> { + assertThat(view.getLeader()) + .get() + .satisfies(CamelClusterMember::isLeader) + .satisfies(CamelClusterMember::isLocal); + }); + } + } + } +} diff --git a/components/camel-infinispan/camel-infinispan-embedded/src/test/resources/log4j2.properties b/components/camel-infinispan/camel-infinispan-embedded/src/test/resources/log4j2.properties index 1301f96..b740e62 100644 --- a/components/camel-infinispan/camel-infinispan-embedded/src/test/resources/log4j2.properties +++ b/components/camel-infinispan/camel-infinispan-embedded/src/test/resources/log4j2.properties @@ -35,6 +35,10 @@ logger.infinispan-camel.level = INFO logger.infinispan-camel-remote.name = org.apache.camel.component.infinispan.remote logger.infinispan-camel-remote.level = INFO logger.infinispan-camel-embedded.name = org.apache.camel.component.infinispan.embedded +logger.infinispan-camel-embedded.level = INFO +logger.infinispan-camel-remote-cluster.name = org.apache.camel.component.infinispan.embedded.cluster +logger.infinispan-camel-remote-cluster.level = DEBUG + logger.infinispan-test-infra-container.name = container.infinispan logger.infinispan-test-infra-container.level = INFO logger.infinispan-camel-test.name = org.apache.camel.test.junit5 diff --git a/components/camel-infinispan/camel-infinispan/pom.xml b/components/camel-infinispan/camel-infinispan/pom.xml index 7db9c45..4b7b32b 100644 --- a/components/camel-infinispan/camel-infinispan/pom.xml +++ b/components/camel-infinispan/camel-infinispan/pom.xml @@ -35,10 +35,6 @@ <dependencies> <dependency> <groupId>org.apache.camel</groupId> - <artifactId>camel-support</artifactId> - </dependency> - <dependency> - <groupId>org.apache.camel</groupId> <artifactId>camel-infinispan-common</artifactId> </dependency> <dependency> @@ -70,6 +66,11 @@ </dependency> <dependency> <groupId>org.apache.camel</groupId> + <artifactId>camel-master</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.camel</groupId> <artifactId>camel-infinispan-common</artifactId> <version>${project.version}</version> <type>test-jar</type> @@ -144,6 +145,11 @@ <artifactId>awaitility</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.assertj</groupId> + <artifactId>assertj-core</artifactId> + <scope>test</scope> + </dependency> </dependencies> diff --git a/components/camel-infinispan/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanRemoteRoutePolicy.java b/components/camel-infinispan/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanRemoteRoutePolicy.java deleted file mode 100644 index 71774e5..0000000 --- a/components/camel-infinispan/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanRemoteRoutePolicy.java +++ /dev/null @@ -1,174 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.infinispan.remote; - -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; - -import org.apache.camel.Service; -import org.apache.camel.api.management.ManagedResource; -import org.apache.camel.component.infinispan.InfinispanRoutePolicy; -import org.apache.camel.component.infinispan.InfinispanUtil; -import org.apache.camel.support.service.ServiceSupport; -import org.apache.camel.util.ObjectHelper; -import org.infinispan.client.hotrod.Flag; -import org.infinispan.client.hotrod.RemoteCache; -import org.infinispan.client.hotrod.annotation.ClientCacheEntryExpired; -import org.infinispan.client.hotrod.annotation.ClientCacheEntryRemoved; -import org.infinispan.client.hotrod.annotation.ClientListener; -import org.infinispan.client.hotrod.event.ClientCacheEntryExpiredEvent; -import org.infinispan.client.hotrod.event.ClientCacheEntryRemovedEvent; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -@ManagedResource(description = "Route policy using Infinispan Remote as clustered lock") -public class InfinispanRemoteRoutePolicy extends InfinispanRoutePolicy { - private static final Logger LOGGER = LoggerFactory.getLogger(InfinispanRemoteRoutePolicy.class); - - private final InfinispanRemoteManager manager; - - public InfinispanRemoteRoutePolicy(InfinispanRemoteConfiguration configuration) { - this(configuration, null, null); - } - - public InfinispanRemoteRoutePolicy(InfinispanRemoteConfiguration configuration, String lockKey, String lockValue) { - super(lockKey, lockValue); - - this.manager = new InfinispanRemoteManager(configuration); - } - - @Override - protected Service createService() { - return new RemoteCacheService(); - } - - @Override - public void doStart() throws Exception { - super.doStart(); - this.manager.start(); - } - - @Override - public void doStop() throws Exception { - super.doStop(); - this.manager.stop(); - } - - // ************************************************************************* - // - // ************************************************************************* - - @ClientListener - private final class RemoteCacheService extends ServiceSupport { - private final int lifespan; - - private RemoteCache<String, String> cache; - private ScheduledExecutorService executorService; - private ScheduledFuture<?> future; - private Long version; - - RemoteCacheService() { - this.lifespan = (int) getLifespanTimeUnit().toSeconds(getLifespan()); - } - - @SuppressWarnings("unchecked") - @Override - protected void doStart() throws Exception { - this.executorService = InfinispanUtil.newSingleThreadScheduledExecutor(getCamelContext(), this); - this.cache = manager.getCache(getLockMapName(), RemoteCache.class); - this.cache.addClientListener(this); - this.future = executorService.scheduleAtFixedRate(this::run, 0, getLifespan() / 2, getLifespanTimeUnit()); - } - - @Override - protected void doStop() throws Exception { - if (cache != null) { - this.cache.removeClientListener(this); - - if (this.version != null) { - this.cache.removeWithVersion(getLockKey(), this.version); - } - } - - if (future != null) { - future.cancel(true); - future = null; - } - - getCamelContext().getExecutorServiceManager().shutdownGraceful(executorService); - } - - private void run() { - if (!isRunAllowed()) { - return; - } - - if (isLeader() && version != null) { - LOGGER.debug("Lock refresh key={} with version={}", getLockKey(), version); - - // I'm still the leader, so refresh the key so it does not expire. - if (!cache.replaceWithVersion(getLockKey(), getLockValue(), version, lifespan)) { - setLeader(false); - } else { - version = cache.getWithMetadata(getLockKey()).getVersion(); - LOGGER.debug("Lock refreshed key={} with new version={}", getLockKey(), version); - } - } - - if (!isLeader()) { - Object result = cache.withFlags(Flag.FORCE_RETURN_VALUE).putIfAbsent(getLockKey(), getLockValue(), - getLifespan(), getLifespanTimeUnit()); - if (result == null) { - // Acquired the key so I'm the leader. - setLeader(true); - - // Get the version - version = cache.getWithMetadata(getLockKey()).getVersion(); - - LOGGER.debug("Lock acquired key={} with version={}", getLockKey(), version); - } else if (ObjectHelper.equal(getLockValue(), result) && !isLeader()) { - // Hey, I may have recovered from failure (or reboot was really - // fast) and my key was still there so yeah, I'm the leader again! - setLeader(true); - - // Get the version - version = cache.getWithMetadata(getLockKey()).getVersion(); - - LOGGER.debug("Lock resumed key={} with version={}", getLockKey(), version); - } else { - setLeader(false); - } - } - } - - @ClientCacheEntryRemoved - public void onCacheEntryRemoved(ClientCacheEntryRemovedEvent<String> event) { - LOGGER.debug("onCacheEntryRemoved lock-key={}, event-key={}", getLockKey(), event.getKey()); - if (ObjectHelper.equal(getLockKey(), event.getKey())) { - run(); - } - } - - @ClientCacheEntryExpired - public void onCacheEntryExpired(ClientCacheEntryExpiredEvent<String> event) { - LOGGER.debug("onCacheEntryExpired lock-key={}, event-key={}", getLockKey(), event.getKey()); - if (ObjectHelper.equal(getLockKey(), event.getKey())) { - run(); - } - } - } -} diff --git a/components/camel-infinispan/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/cluster/InfinispanRemoteClusterConfiguration.java b/components/camel-infinispan/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/cluster/InfinispanRemoteClusterConfiguration.java new file mode 100644 index 0000000..54682a3 --- /dev/null +++ b/components/camel-infinispan/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/cluster/InfinispanRemoteClusterConfiguration.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.infinispan.remote.cluster; + +import java.util.Map; + +import org.apache.camel.component.infinispan.cluster.InfinispanClusterConfiguration; +import org.apache.camel.component.infinispan.remote.InfinispanRemoteConfiguration; +import org.infinispan.client.hotrod.RemoteCacheManager; +import org.infinispan.client.hotrod.configuration.Configuration; + +public class InfinispanRemoteClusterConfiguration + extends InfinispanClusterConfiguration<InfinispanRemoteConfiguration> + implements Cloneable { + + public InfinispanRemoteClusterConfiguration() { + super(new InfinispanRemoteConfiguration()); + } + + // *********************************************** + // Properties + // *********************************************** + + public String getHosts() { + return getConfiguration().getHosts(); + } + + public void setHosts(String hosts) { + getConfiguration().setHosts(hosts); + } + + public boolean isSecure() { + return getConfiguration().isSecure(); + } + + public void setSecure(boolean secure) { + getConfiguration().setSecure(secure); + } + + public String getUsername() { + return getConfiguration().getUsername(); + } + + public void setUsername(String username) { + getConfiguration().setUsername(username); + } + + public String getPassword() { + return getConfiguration().getPassword(); + } + + public void setPassword(String password) { + getConfiguration().setPassword(password); + } + + public String getSaslMechanism() { + return getConfiguration().getSaslMechanism(); + } + + public void setSaslMechanism(String saslMechanism) { + getConfiguration().setSaslMechanism(saslMechanism); + } + + public String getSecurityRealm() { + return getConfiguration().getSecurityRealm(); + } + + public void setSecurityRealm(String securityRealm) { + getConfiguration().setSecurityRealm(securityRealm); + } + + public String getSecurityServerName() { + return getConfiguration().getSecurityServerName(); + } + + public void setSecurityServerName(String securityServerName) { + getConfiguration().setSecurityServerName(securityServerName); + } + + public Map<String, String> getConfigurationProperties() { + return getConfiguration().getConfigurationProperties(); + } + + public void setConfigurationProperties(Map<String, String> configurationProperties) { + getConfiguration().setConfigurationProperties(configurationProperties); + } + + public void addConfigurationProperty(String key, String value) { + getConfiguration().addConfigurationProperty(key, value); + } + + public RemoteCacheManager getCacheContainer() { + return getConfiguration().getCacheContainer(); + } + + public void setCacheContainer(RemoteCacheManager cacheContainer) { + getConfiguration().setCacheContainer(cacheContainer); + } + + public Configuration getCacheContainerConfiguration() { + return getConfiguration().getCacheContainerConfiguration(); + } + + public void setCacheContainerConfiguration(Configuration cacheContainerConfiguration) { + getConfiguration().setCacheContainerConfiguration(cacheContainerConfiguration); + } + + // *********************************************** + // + // *********************************************** + + @Override + public InfinispanRemoteClusterConfiguration clone() { + return (InfinispanRemoteClusterConfiguration) super.clone(); + } +} diff --git a/components/camel-infinispan/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/cluster/InfinispanRemoteClusterService.java b/components/camel-infinispan/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/cluster/InfinispanRemoteClusterService.java new file mode 100644 index 0000000..bbfd461 --- /dev/null +++ b/components/camel-infinispan/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/cluster/InfinispanRemoteClusterService.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.infinispan.remote.cluster; + +import java.util.concurrent.TimeUnit; + +import org.apache.camel.component.infinispan.cluster.InfinispanClusterService; +import org.apache.camel.component.infinispan.cluster.InfinispanClusterView; +import org.apache.camel.util.ObjectHelper; +import org.infinispan.client.hotrod.RemoteCacheManager; +import org.infinispan.client.hotrod.configuration.Configuration; + +public class InfinispanRemoteClusterService extends InfinispanClusterService { + private InfinispanRemoteClusterConfiguration configuration; + + public InfinispanRemoteClusterService() { + this.configuration = new InfinispanRemoteClusterConfiguration(); + } + + public InfinispanRemoteClusterService(InfinispanRemoteClusterConfiguration configuration) { + this.configuration = configuration.clone(); + } + + // ********************************************* + // Properties + // ********************************************* + + public InfinispanRemoteClusterConfiguration getConfiguration() { + return configuration; + } + + public void setConfiguration(InfinispanRemoteClusterConfiguration configuration) { + this.configuration = configuration.clone(); + } + + public void setConfigurationUri(String configurationUri) { + configuration.setConfigurationUri(configurationUri); + } + + public RemoteCacheManager getCacheContainer() { + return configuration.getCacheContainer(); + } + + public void setCacheContainer(RemoteCacheManager cacheContainer) { + configuration.setCacheContainer(cacheContainer); + } + + public Configuration getCacheContainerConfiguration() { + return configuration.getCacheContainerConfiguration(); + } + + public void setCacheContainerConfiguration(Configuration cacheContainerConfiguration) { + configuration.setCacheContainerConfiguration(cacheContainerConfiguration); + } + + public long getLifespan() { + return configuration.getLifespan(); + } + + public void setLifespan(long lifespan) { + configuration.setLifespan(lifespan); + } + + public TimeUnit getLifespanTimeUnit() { + return configuration.getLifespanTimeUnit(); + } + + public void setLifespanTimeUnit(TimeUnit lifespanTimeUnit) { + configuration.setLifespanTimeUnit(lifespanTimeUnit); + } + + // ********************************************* + // Impl + // ********************************************* + + @Override + protected InfinispanClusterView createView(String namespace) throws Exception { + // Validate parameters + ObjectHelper.notNull(getCamelContext(), "Camel Context"); + ObjectHelper.notNull(getId(), "Cluster ID"); + + return new InfinispanRemoteClusterView(this, configuration, namespace); + } +} diff --git a/components/camel-infinispan/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/cluster/InfinispanRemoteClusterView.java b/components/camel-infinispan/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/cluster/InfinispanRemoteClusterView.java new file mode 100644 index 0000000..a0dd216 --- /dev/null +++ b/components/camel-infinispan/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/cluster/InfinispanRemoteClusterView.java @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.infinispan.remote.cluster; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; + +import org.apache.camel.cluster.CamelClusterMember; +import org.apache.camel.component.infinispan.InfinispanUtil; +import org.apache.camel.component.infinispan.cluster.InfinispanClusterService; +import org.apache.camel.component.infinispan.cluster.InfinispanClusterView; +import org.apache.camel.component.infinispan.remote.InfinispanRemoteManager; +import org.apache.camel.support.service.ServiceHelper; +import org.apache.camel.support.service.ServiceSupport; +import org.infinispan.client.hotrod.Flag; +import org.infinispan.client.hotrod.RemoteCache; +import org.infinispan.client.hotrod.annotation.ClientCacheEntryExpired; +import org.infinispan.client.hotrod.annotation.ClientCacheEntryRemoved; +import org.infinispan.client.hotrod.annotation.ClientListener; +import org.infinispan.client.hotrod.event.ClientCacheEntryExpiredEvent; +import org.infinispan.client.hotrod.event.ClientCacheEntryRemovedEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.camel.util.function.Predicates.negate; + +public class InfinispanRemoteClusterView extends InfinispanClusterView { + private static final Logger LOGGER = LoggerFactory.getLogger(InfinispanRemoteClusterService.class); + + private final InfinispanRemoteClusterConfiguration configuration; + private final InfinispanRemoteManager manager; + private final LocalMember localMember; + private final LeadershipService leadership; + + private RemoteCache<String, String> cache; + + protected InfinispanRemoteClusterView( + InfinispanRemoteClusterService cluster, + InfinispanRemoteClusterConfiguration configuration, + String namespace) { + super(cluster, namespace); + + this.configuration = configuration; + this.manager = new InfinispanRemoteManager(this.configuration.getConfiguration()); + this.leadership = new LeadershipService(); + this.localMember = new LocalMember(cluster.getId()); + } + + @SuppressWarnings("unchecked") + @Override + public void doStart() throws Exception { + super.doStart(); + + ServiceHelper.startService(manager); + + this.cache = manager.getCache(getNamespace(), RemoteCache.class); + + ServiceHelper.startService(leadership); + } + + @Override + public void doStop() throws Exception { + super.doStop(); + + LOGGER.info("shutdown service: {}", getClusterService().getId()); + + ServiceHelper.stopService(leadership); + ServiceHelper.stopService(manager); + + this.cache = null; + } + + @Override + public CamelClusterMember getLocalMember() { + return this.localMember; + } + + @Override + public List<CamelClusterMember> getMembers() { + return this.cache != null + ? cache.keySet().stream() + .filter(negate(InfinispanClusterService.LEADER_KEY::equals)) + .map(ClusterMember::new) + .collect(Collectors.toList()) + : Collections.emptyList(); + } + + @Override + public Optional<CamelClusterMember> getLeader() { + if (this.cache == null) { + return Optional.empty(); + } + + String id = cache.get(InfinispanClusterService.LEADER_KEY); + if (id == null) { + return Optional.empty(); + } + + return Optional.of(new ClusterMember(id)); + } + + @Override + protected boolean isLeader(String id) { + if (this.cache == null) { + return false; + } + if (id == null) { + return false; + } + + final String key = InfinispanClusterService.LEADER_KEY; + final String val = this.cache.get(key); + + return Objects.equals(id, val); + } + + // ***************************************** + // + // Service + // + // ***************************************** + + @ClientListener + private final class LeadershipService extends ServiceSupport { + private final int lifespan; + private final AtomicBoolean running; + + private ScheduledExecutorService executorService; + private Long version; + + LeadershipService() { + this.lifespan = (int) configuration.getLifespanTimeUnit().toSeconds(configuration.getLifespan()); + this.running = new AtomicBoolean(false); + } + + @Override + protected void doStart() throws Exception { + super.doStart(); + + this.running.set(true); + this.executorService = InfinispanUtil.newSingleThreadScheduledExecutor( + getCamelContext(), + this, + getLocalMember().getId()); + + // register the local member to the inventory + cache.put( + getLocalMember().getId(), + "false", + configuration.getLifespan(), + configuration.getLifespanTimeUnit()); + + cache.addClientListener(this); + + executorService.scheduleAtFixedRate( + this::run, + 0, + configuration.getLifespan() / 2, + configuration.getLifespanTimeUnit()); + } + + @Override + protected void doStop() throws Exception { + super.doStop(); + + this.running.set(false); + + if (cache != null) { + cache.removeClientListener(this); + } + + getCamelContext().getExecutorServiceManager().shutdownGraceful(executorService); + + if (cache != null) { + if (this.version != null) { + cache.removeWithVersion(InfinispanClusterService.LEADER_KEY, this.version); + } + + LOGGER.info("Removing local member, key={}", getLocalMember().getId()); + cache.remove(getLocalMember().getId()); + } + + this.version = null; + } + + private boolean isLeader() { + return getLocalMember().isLeader(); + } + + private void setLeader(boolean leader) { + ((LocalMember) getLocalMember()).setLeader(leader); + } + + private synchronized void run() { + if (!running.get()) { + return; + } + + final String leaderKey = InfinispanClusterService.LEADER_KEY; + final String localId = getLocalMember().getId(); + + if (isLeader() && version != null) { + LOGGER.debug("Lock refresh key={}, id{} with version={}", leaderKey, localId, version); + + // I'm still the leader, so refresh the key so it does not expire. + if (!cache.replaceWithVersion( + leaderKey, + getClusterService().getId(), + version, + lifespan)) { + + LOGGER.debug("Failed to refresh the lock key={}, id={}, version={}", leaderKey, localId, version); + + setLeader(false); + } else { + version = cache.getWithMetadata(leaderKey).getVersion(); + + LOGGER.debug("Lock refreshed key={}, ud={}, with new version={}", leaderKey, localId, version); + } + } + + if (!isLeader()) { + LOGGER.debug("Try to acquire lock key={}, id={}", leaderKey, localId); + + Object result = cache.withFlags(Flag.FORCE_RETURN_VALUE) + .putIfAbsent( + leaderKey, + localId, + configuration.getLifespan(), + configuration.getLifespanTimeUnit()); + + if (result == null) { + // Acquired the key so I'm the leader. + setLeader(true); + + // Get the version + version = cache.getWithMetadata(leaderKey).getVersion(); + + LOGGER.debug("Lock acquired key={}, id={}, with version={}", leaderKey, localId, version); + + } else if (Objects.equals(getClusterService().getId(), result) && !isLeader()) { + // Hey, I may have recovered from failure (or reboot was really + // fast) and my key was still there so yeah, I'm the leader again! + setLeader(true); + + // Get the version + version = cache.getWithMetadata(leaderKey).getVersion(); + + LOGGER.debug("Lock resumed key={}, id={} with version={}", leaderKey, localId, version); + } else { + LOGGER.debug("Failed to acquire the lock key={}, id={}", leaderKey, localId); + + setLeader(false); + } + } + + // refresh local membership + cache.put( + getLocalMember().getId(), + isLeader() ? "true" : "false", + configuration.getLifespan(), + configuration.getLifespanTimeUnit()); + } + + @ClientCacheEntryRemoved + public void onCacheEntryRemoved(ClientCacheEntryRemovedEvent<String> event) { + if (!running.get()) { + return; + } + + LOGGER.debug("onCacheEntryRemoved id={}, lock-key={}, event-key={}", + getLocalMember().getId(), + InfinispanClusterService.LEADER_KEY, + event.getKey()); + + if (Objects.equals(InfinispanClusterService.LEADER_KEY, event.getKey())) { + executorService.execute(this::run); + } + } + + @ClientCacheEntryExpired + public void onCacheEntryExpired(ClientCacheEntryExpiredEvent<String> event) { + if (!running.get()) { + return; + } + + LOGGER.debug("onCacheEntryExpired id={}, lock-key={}, event-key={}", + getLocalMember().getId(), + InfinispanClusterService.LEADER_KEY, + event.getKey()); + + if (Objects.equals(InfinispanClusterService.LEADER_KEY, event.getKey())) { + executorService.execute(this::run); + } + } + } +} diff --git a/components/camel-infinispan/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/remote/InfinispanRemoteRoutePolicyTest.java b/components/camel-infinispan/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/remote/InfinispanRemoteRoutePolicyTest.java deleted file mode 100644 index f09887d..0000000 --- a/components/camel-infinispan/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/remote/InfinispanRemoteRoutePolicyTest.java +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.infinispan.remote; - -import java.util.concurrent.TimeUnit; - -import org.apache.camel.CamelContext; -import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.component.infinispan.InfinispanRoutePolicy; -import org.apache.camel.impl.DefaultCamelContext; -import org.apache.camel.test.infra.infinispan.services.InfinispanService; -import org.apache.camel.test.infra.infinispan.services.InfinispanServiceFactory; -import org.infinispan.client.hotrod.RemoteCacheManager; -import org.infinispan.client.hotrod.configuration.ConfigurationBuilder; -import org.infinispan.configuration.cache.CacheMode; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.RegisterExtension; - -import static org.awaitility.Awaitility.await; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; - -public class InfinispanRemoteRoutePolicyTest { - public static final String CACHE_NAME = "_route_policy"; - - @RegisterExtension - public static InfinispanService service = InfinispanServiceFactory.createService(); - - @Test - public void testLeadership() throws Exception { - try (RemoteCacheManager cacheContainer = createCacheContainer(service)) { - - cacheContainer.start(); - - final InfinispanRoutePolicy policy1 = createRoutePolicy(cacheContainer, "route1"); - final InfinispanRoutePolicy policy2 = createRoutePolicy(cacheContainer, "route2"); - - try (CamelContext context = new DefaultCamelContext()) { - context.start(); - - RouteBuilder.addRoutes(context, b -> b.from("direct:r1").routePolicy(policy1).to("mock:p1")); - - await().atMost(10, TimeUnit.SECONDS).until(policy1::isLeader); - - RouteBuilder.addRoutes(context, b -> b.from("direct:r2").routePolicy(policy2).to("mock:p2")); - - assertTrue(policy1.isLeader()); - assertFalse(policy2.isLeader()); - - policy1.shutdown(); - - await().atMost(10, TimeUnit.SECONDS).until(policy2::isLeader); - - assertFalse(policy1.isLeader()); - assertTrue(policy2.isLeader()); - } - } - } - - // ***************************** - // - // ***************************** - - private static RemoteCacheManager createCacheContainer(InfinispanService service) { - ConfigurationBuilder clientBuilder = new ConfigurationBuilder(); - - // for default tests, we force return value for all the - // operations - clientBuilder - .forceReturnValues(true); - - // add server from the test infra service - clientBuilder - .addServer() - .host(service.host()) - .port(service.port()); - - // add security info - clientBuilder - .security() - .authentication() - .username(service.username()) - .password(service.password()) - .serverName("infinispan") - .saslMechanism("DIGEST-MD5") - .realm("default"); - - RemoteCacheManager cacheContainer = new RemoteCacheManager(clientBuilder.build()); - cacheContainer.administration() - .getOrCreateCache( - CACHE_NAME, - new org.infinispan.configuration.cache.ConfigurationBuilder() - .clustering() - .cacheMode(CacheMode.DIST_SYNC).build()); - - return cacheContainer; - } - - private static InfinispanRoutePolicy createRoutePolicy(RemoteCacheManager cacheContainer, String lockValue) { - InfinispanRemoteConfiguration configuration = new InfinispanRemoteConfiguration(); - configuration.setCacheContainer(cacheContainer); - - InfinispanRemoteRoutePolicy policy = new InfinispanRemoteRoutePolicy(configuration); - policy.setLockMapName(CACHE_NAME); - policy.setLockKey("lock-key"); - policy.setLockValue(lockValue); - - return policy; - } -} diff --git a/components/camel-infinispan/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/remote/cluster/AbstractInfinispanRemoteClusteredTest.java b/components/camel-infinispan/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/remote/cluster/AbstractInfinispanRemoteClusteredTest.java new file mode 100644 index 0000000..aca2ea8 --- /dev/null +++ b/components/camel-infinispan/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/remote/cluster/AbstractInfinispanRemoteClusteredTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.infinispan.remote.cluster; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.apache.camel.test.infra.infinispan.services.InfinispanService; +import org.apache.camel.test.infra.infinispan.services.InfinispanServiceFactory; +import org.infinispan.client.hotrod.RemoteCacheManager; +import org.infinispan.client.hotrod.configuration.Configuration; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.camel.component.infinispan.remote.cluster.InfinispanRemoteClusteredTestSupport.createCache; +import static org.apache.camel.component.infinispan.remote.cluster.InfinispanRemoteClusteredTestSupport.createConfiguration; +import static org.assertj.core.api.Assertions.assertThat; + +abstract class AbstractInfinispanRemoteClusteredTest { + @RegisterExtension + public static InfinispanService service = InfinispanServiceFactory.createService(); + + @Timeout(value = 1, unit = TimeUnit.MINUTES) + @Test + public void test() throws Exception { + final Logger logger = LoggerFactory.getLogger(getClass()); + final List<String> clients = IntStream.range(0, 3).mapToObj(Integer::toString).collect(Collectors.toList()); + final List<String> results = new ArrayList<>(); + final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(clients.size() * 2); + final CountDownLatch latch = new CountDownLatch(clients.size()); + final String viewName = "myView"; + + Configuration configuration = createConfiguration(service); + + try (RemoteCacheManager cacheContainer = new RemoteCacheManager(configuration)) { + createCache(cacheContainer, viewName); + + for (String id : clients) { + scheduler.submit(() -> { + try { + run(cacheContainer, viewName, id); + logger.debug("Node {} is shutting down", id); + results.add(id); + } catch (Exception e) { + logger.warn("", e); + } finally { + latch.countDown(); + } + }); + } + + latch.await(); + scheduler.shutdownNow(); + + assertThat(results).hasSameSizeAs(clients); + assertThat(results).containsAll(clients); + } + } + + protected abstract void run(RemoteCacheManager cacheContainer, String namespace, String id) throws Exception; +} diff --git a/components/camel-infinispan/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/remote/cluster/InfinispanRemoteClusteredMasterTest.java b/components/camel-infinispan/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/remote/cluster/InfinispanRemoteClusteredMasterTest.java new file mode 100644 index 0000000..73621cc --- /dev/null +++ b/components/camel-infinispan/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/remote/cluster/InfinispanRemoteClusteredMasterTest.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.infinispan.remote.cluster; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadLocalRandom; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.impl.DefaultCamelContext; +import org.infinispan.client.hotrod.RemoteCacheManager; + +public class InfinispanRemoteClusteredMasterTest extends AbstractInfinispanRemoteClusteredTest { + @Override + protected void run(RemoteCacheManager cacheContainer, String namespace, String id) throws Exception { + final int events = ThreadLocalRandom.current().nextInt(2, 6); + final CountDownLatch contextLatch = new CountDownLatch(events); + + //Set up a single node cluster. + InfinispanRemoteClusterService clusterService = new InfinispanRemoteClusterService(); + clusterService.setCacheContainer(cacheContainer); + clusterService.setId("node-" + id); + + try (DefaultCamelContext context = new DefaultCamelContext()) { + context.disableJMX(); + context.setName("context-" + id); + context.addService(clusterService); + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + fromF("master:%s:timer:%s?delay=1000&period=1000&repeatCount=%d", namespace, id, events) + .routeId("route-" + id) + .log("From id=${routeId} counter=${header.CamelTimerCounter}") + .process(e -> contextLatch.countDown()); + } + }); + + // Start the context after some random time so the startup order + // changes for each test. + Thread.sleep(ThreadLocalRandom.current().nextInt(500)); + context.start(); + + contextLatch.await(); + } + } +} diff --git a/components/camel-infinispan/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/remote/cluster/InfinispanRemoteClusteredRoutePolicyFactoryTest.java b/components/camel-infinispan/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/remote/cluster/InfinispanRemoteClusteredRoutePolicyFactoryTest.java new file mode 100644 index 0000000..bbbc1d1 --- /dev/null +++ b/components/camel-infinispan/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/remote/cluster/InfinispanRemoteClusteredRoutePolicyFactoryTest.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.infinispan.remote.cluster; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadLocalRandom; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.impl.cluster.ClusteredRoutePolicyFactory; +import org.infinispan.client.hotrod.RemoteCacheManager; + +public class InfinispanRemoteClusteredRoutePolicyFactoryTest extends AbstractInfinispanRemoteClusteredTest { + @Override + protected void run(RemoteCacheManager cacheContainer, String namespace, String id) throws Exception { + final int events = ThreadLocalRandom.current().nextInt(2, 6); + final CountDownLatch contextLatch = new CountDownLatch(events); + + //Set up a single node cluster. + InfinispanRemoteClusterService clusterService = new InfinispanRemoteClusterService(); + clusterService.setCacheContainer(cacheContainer); + clusterService.setId("node-" + id); + + try (DefaultCamelContext context = new DefaultCamelContext()) { + context.disableJMX(); + context.setName("context-" + id); + context.addService(clusterService); + context.addRoutePolicyFactory(ClusteredRoutePolicyFactory.forNamespace(namespace)); + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + fromF("timer:%s?delay=1000&period=1000&repeatCount=%d", id, events) + .routeId("route-" + id) + .log("From id=${routeId} counter=${header.CamelTimerCounter}") + .process(e -> contextLatch.countDown()); + } + }); + + // Start the context after some random time so the startup order + // changes for each test. + Thread.sleep(ThreadLocalRandom.current().nextInt(500)); + context.start(); + + contextLatch.await(); + } + } +} diff --git a/components/camel-infinispan/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/remote/cluster/InfinispanRemoteClusteredRoutePolicyTest.java b/components/camel-infinispan/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/remote/cluster/InfinispanRemoteClusteredRoutePolicyTest.java new file mode 100644 index 0000000..5dcafca --- /dev/null +++ b/components/camel-infinispan/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/remote/cluster/InfinispanRemoteClusteredRoutePolicyTest.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.infinispan.remote.cluster; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadLocalRandom; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.impl.cluster.ClusteredRoutePolicy; +import org.infinispan.client.hotrod.RemoteCacheManager; + +public class InfinispanRemoteClusteredRoutePolicyTest extends AbstractInfinispanRemoteClusteredTest { + @Override + protected void run(RemoteCacheManager cacheContainer, String namespace, String id) throws Exception { + final int events = ThreadLocalRandom.current().nextInt(2, 6); + final CountDownLatch contextLatch = new CountDownLatch(events); + + //Set up a single node cluster. + InfinispanRemoteClusterService clusterService = new InfinispanRemoteClusterService(); + clusterService.setCacheContainer(cacheContainer); + clusterService.setId("node-" + id); + + try (DefaultCamelContext context = new DefaultCamelContext()) { + context.disableJMX(); + context.setName("context-" + id); + context.addService(clusterService); + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + fromF("timer:%s?delay=1000&period=1000&repeatCount=%d", id, events) + .routeId("route-" + id) + .routePolicy(ClusteredRoutePolicy.forNamespace(namespace)) + .log("From id=${routeId} counter=${header.CamelTimerCounter}") + .process(e -> contextLatch.countDown()); + } + }); + + // Start the context after some random time so the startup order + // changes for each test. + Thread.sleep(ThreadLocalRandom.current().nextInt(500)); + context.start(); + + contextLatch.await(); + } + } +} diff --git a/components/camel-infinispan/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/remote/cluster/InfinispanRemoteClusteredTestSupport.java b/components/camel-infinispan/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/remote/cluster/InfinispanRemoteClusteredTestSupport.java new file mode 100644 index 0000000..a78ab82 --- /dev/null +++ b/components/camel-infinispan/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/remote/cluster/InfinispanRemoteClusteredTestSupport.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.infinispan.remote.cluster; + +import org.apache.camel.test.infra.infinispan.services.InfinispanService; +import org.infinispan.client.hotrod.RemoteCacheManager; +import org.infinispan.client.hotrod.configuration.Configuration; +import org.infinispan.client.hotrod.configuration.ConfigurationBuilder; +import org.infinispan.configuration.cache.CacheMode; + +public final class InfinispanRemoteClusteredTestSupport { + private InfinispanRemoteClusteredTestSupport() { + } + + public static Configuration createConfiguration(InfinispanService service) { + return new ConfigurationBuilder() + .addServer() + .host(service.host()) + .port(service.port()) + .security() + .authentication() + .username(service.username()) + .password(service.password()) + .serverName("infinispan") + .saslMechanism("DIGEST-MD5") + .realm("default") + .build(); + + } + + public static void createCache(RemoteCacheManager cacheContainer, String cacheName) { + cacheContainer.administration() + .getOrCreateCache( + cacheName, + new org.infinispan.configuration.cache.ConfigurationBuilder() + .clustering() + .cacheMode(CacheMode.DIST_SYNC).build()); + } +} diff --git a/components/camel-infinispan/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/remote/cluster/InfinispanRemoteClusteredViewTest.java b/components/camel-infinispan/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/remote/cluster/InfinispanRemoteClusteredViewTest.java new file mode 100644 index 0000000..dd76a64 --- /dev/null +++ b/components/camel-infinispan/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/remote/cluster/InfinispanRemoteClusteredViewTest.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.infinispan.remote.cluster; + +import java.util.concurrent.TimeUnit; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.cluster.CamelClusterMember; +import org.apache.camel.cluster.CamelClusterView; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.test.infra.infinispan.services.InfinispanService; +import org.apache.camel.test.infra.infinispan.services.InfinispanServiceFactory; +import org.infinispan.client.hotrod.RemoteCacheManager; +import org.infinispan.client.hotrod.configuration.Configuration; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import static org.apache.camel.component.infinispan.remote.cluster.InfinispanRemoteClusteredTestSupport.createCache; +import static org.apache.camel.component.infinispan.remote.cluster.InfinispanRemoteClusteredTestSupport.createConfiguration; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +public class InfinispanRemoteClusteredViewTest { + @RegisterExtension + public static InfinispanService service = InfinispanServiceFactory.createService(); + + @Test + public void getLeaderTest() throws Exception { + final String viewName = "myView"; + + Configuration configuration = createConfiguration(service); + + try (RemoteCacheManager cacheContainer = new RemoteCacheManager(configuration)) { + createCache(cacheContainer, viewName); + + InfinispanRemoteClusterService clusterService = new InfinispanRemoteClusterService(); + clusterService.setCacheContainer(cacheContainer); + clusterService.setId("node"); + + //Set up context with single locked route. + try (DefaultCamelContext context = new DefaultCamelContext()) { + context.disableJMX(); + context.addService(clusterService); + + context.addRoutes(new RouteBuilder() { + @Override + public void configure() { + fromF("master:%s:timer:infinispan?repeatCount=1", viewName) + .routeId("route1") + .stop(); + } + }); + + context.start(); + + CamelClusterView view = clusterService.getView(viewName); + + await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> { + assertThat(view.getLeader()) + .get() + .satisfies(CamelClusterMember::isLeader) + .satisfies(CamelClusterMember::isLocal); + }); + } + } + } +} diff --git a/components/camel-infinispan/camel-infinispan/src/test/resources/log4j2.properties b/components/camel-infinispan/camel-infinispan/src/test/resources/log4j2.properties index 3847970..5a3ff87 100644 --- a/components/camel-infinispan/camel-infinispan/src/test/resources/log4j2.properties +++ b/components/camel-infinispan/camel-infinispan/src/test/resources/log4j2.properties @@ -34,6 +34,8 @@ logger.infinispan-camel.name = org.apache.camel.component.infinispan logger.infinispan-camel.level = INFO logger.infinispan-camel-remote.name = org.apache.camel.component.infinispan.remote logger.infinispan-camel-remote.level = INFO +logger.infinispan-camel-remote-cluster.name = org.apache.camel.component.infinispan.remote.cluster +logger.infinispan-camel-remote-cluster.level = DEBUG logger.infinispan-camel-embedded.name = org.apache.camel.component.infinispan.embedded logger.infinispan-test-infra-container.name = container.infinispan logger.infinispan-test-infra-container.level = WARN