Refine Cluster Service API and implementations
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8ef99f5c Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8ef99f5c Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8ef99f5c Branch: refs/heads/master Commit: 8ef99f5cffb268e4fd838b66f5dbcd4cc841c776 Parents: 24cb543 Author: lburgazzoli <lburgazz...@gmail.com> Authored: Sat Sep 23 17:40:57 2017 +0200 Committer: lburgazzoli <lburgazz...@gmail.com> Committed: Mon Sep 25 14:21:01 2017 +0200 ---------------------------------------------------------------------- .../camel/ha/CamelClusterEventListener.java | 6 +- .../org/apache/camel/ha/CamelClusterMember.java | 5 ++ .../camel/impl/ha/AbstractCamelClusterView.java | 3 +- .../camel/impl/ha/ClusteredRoutePolicy.java | 3 +- .../atomix/ha/AtomixClusterService.java | 4 +- .../component/atomix/ha/AtomixClusterView.java | 35 ++++++++--- .../ha/AtomixClientRoutePolicyTestSupport.java | 18 +++--- .../atomix/ha/AtomixRoutePolicyTest.java | 20 +++---- .../src/test/resources/log4j2.properties | 4 ++ components/camel-consul/pom.xml | 2 +- .../component/consul/ha/ConsulClusterView.java | 37 ++++++++---- .../consul/ha/ConsulClusteredRoutePolicyIT.java | 18 +++--- .../src/test/resources/log4j2.properties | 16 ++++- .../kubernetes/ha/KubernetesClusterView.java | 8 ++- .../ha/lock/KubernetesLockConfiguration.java | 2 +- .../kubernetes/ha/utils/LeaderRecorder.java | 5 +- .../zookeeper/ha/ZooKeeperClusterView.java | 61 ++++++++++++-------- .../ha/ZooKeeperClusteredRoutePolicyTest.java | 18 +++--- .../src/test/resources/log4j2.properties | 12 ++-- .../AtomixClusterServiceAutoConfiguration.java | 8 +-- .../AtomixClusterServiceConfiguration.java | 2 +- .../ConsulClusterServiceAutoConfiguration.java | 2 +- .../ha/ConsulClusterServiceConfiguration.java | 2 +- ...ooKeeperClusterServiceAutoConfiguration.java | 2 +- .../ZooKeeperClusterServiceConfiguration.java | 2 +- 25 files changed, 188 insertions(+), 107 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/8ef99f5c/camel-core/src/main/java/org/apache/camel/ha/CamelClusterEventListener.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/ha/CamelClusterEventListener.java b/camel-core/src/main/java/org/apache/camel/ha/CamelClusterEventListener.java index 1a972ca..8fcd4d6 100644 --- a/camel-core/src/main/java/org/apache/camel/ha/CamelClusterEventListener.java +++ b/camel-core/src/main/java/org/apache/camel/ha/CamelClusterEventListener.java @@ -16,6 +16,8 @@ */ package org.apache.camel.ha; +import java.util.Optional; + /** * Marker interface for cluster events */ @@ -27,9 +29,9 @@ public interface CamelClusterEventListener { * Notify a change in the leadership for a particular cluster. * * @param view the cluster view - * @param leader the new leader or null (when there are no active leaders) + * @param leader the optional new leader */ - void leadershipChanged(CamelClusterView view, CamelClusterMember leader); + void leadershipChanged(CamelClusterView view, Optional<CamelClusterMember> leader); } http://git-wip-us.apache.org/repos/asf/camel/blob/8ef99f5c/camel-core/src/main/java/org/apache/camel/ha/CamelClusterMember.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/ha/CamelClusterMember.java b/camel-core/src/main/java/org/apache/camel/ha/CamelClusterMember.java index 285c7aa..951ea9e 100644 --- a/camel-core/src/main/java/org/apache/camel/ha/CamelClusterMember.java +++ b/camel-core/src/main/java/org/apache/camel/ha/CamelClusterMember.java @@ -23,4 +23,9 @@ public interface CamelClusterMember extends HasId { * @return true if this member is the master. */ boolean isLeader(); + + /** + * @return true if this member is local. + */ + boolean isLocal(); } http://git-wip-us.apache.org/repos/asf/camel/blob/8ef99f5c/camel-core/src/main/java/org/apache/camel/impl/ha/AbstractCamelClusterView.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/ha/AbstractCamelClusterView.java b/camel-core/src/main/java/org/apache/camel/impl/ha/AbstractCamelClusterView.java index 1605c6d..8a060c6 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/ha/AbstractCamelClusterView.java +++ b/camel-core/src/main/java/org/apache/camel/impl/ha/AbstractCamelClusterView.java @@ -18,6 +18,7 @@ package org.apache.camel.impl.ha; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.concurrent.locks.StampedLock; import java.util.function.Consumer; @@ -92,7 +93,7 @@ public abstract class AbstractCamelClusterView extends ServiceSupport implements ); } - protected void fireLeadershipChangedEvent(CamelClusterMember leader) { + protected void fireLeadershipChangedEvent(Optional<CamelClusterMember> leader) { doWithListener( CamelClusterEventListener.Leadership.class, listener -> listener.leadershipChanged(this, leader) http://git-wip-us.apache.org/repos/asf/camel/blob/8ef99f5c/camel-core/src/main/java/org/apache/camel/impl/ha/ClusteredRoutePolicy.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/ha/ClusteredRoutePolicy.java b/camel-core/src/main/java/org/apache/camel/impl/ha/ClusteredRoutePolicy.java index 26cee62..8e923d1 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/ha/ClusteredRoutePolicy.java +++ b/camel-core/src/main/java/org/apache/camel/impl/ha/ClusteredRoutePolicy.java @@ -19,6 +19,7 @@ package org.apache.camel.impl.ha; import java.time.Duration; import java.util.EventObject; import java.util.HashSet; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -262,7 +263,7 @@ public class ClusteredRoutePolicy extends RoutePolicySupport implements CamelCon private class CamelClusterLeadershipListener implements CamelClusterEventListener.Leadership { @Override - public void leadershipChanged(CamelClusterView view, CamelClusterMember leader) { + public void leadershipChanged(CamelClusterView view, Optional<CamelClusterMember> leader) { setLeader(clusterView.getLocalMember().isLeader()); } } http://git-wip-us.apache.org/repos/asf/camel/blob/8ef99f5c/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterService.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterService.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterService.java index b2315ae..1937a82 100644 --- a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterService.java +++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterService.java @@ -147,8 +147,8 @@ public final class AtomixClusterService extends AbstractCamelClusterService<Atom super.doStop(); if (atomix != null) { - LOGGER.debug("Shutdown atomix replica {}", atomix); - atomix.shutdown().join(); + LOGGER.debug("Leaving atomix cluster replica {}", atomix); + atomix.leave().join(); } } http://git-wip-us.apache.org/repos/asf/camel/blob/8ef99f5c/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterView.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterView.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterView.java index be24d64..91367e9 100644 --- a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterView.java +++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterView.java @@ -95,21 +95,21 @@ final class AtomixClusterView extends AbstractCamelClusterView { LOGGER.debug("Listen election events"); group.election().onElection(term -> { if (isRunAllowed()) { - fireLeadershipChangedEvent(new AtomixClusterMember(term.leader())); + fireLeadershipChangedEvent(Optional.of(toClusterMember(term.leader()))); } }); LOGGER.debug("Listen join events"); group.onJoin(member -> { if (isRunAllowed()) { - fireMemberAddedEvent(new AtomixClusterMember(member)); + fireMemberAddedEvent(toClusterMember(member)); } }); LOGGER.debug("Listen leave events"); group.onLeave(member -> { if (isRunAllowed()) { - fireMemberRemovedEvent(new AtomixClusterMember(member)); + fireMemberRemovedEvent(toClusterMember(member)); } }); @@ -123,11 +123,17 @@ final class AtomixClusterView extends AbstractCamelClusterView { localMember.leave(); } + protected CamelClusterMember toClusterMember(GroupMember member) { + return localMember != null && localMember.is(member) + ? localMember + : new AtomixClusterMember(member); + } + // *********************************************** // // *********************************************** - class AtomixLocalMember implements CamelClusterMember { + final class AtomixLocalMember implements CamelClusterMember { private LocalMember member; @Override @@ -153,6 +159,17 @@ final class AtomixClusterView extends AbstractCamelClusterView { return member.equals(group.election().term().leader()); } + @Override + public boolean isLocal() { + return true; + } + + boolean is(GroupMember member) { + return this.member != null + ? this.member.equals(member) + : false; + } + boolean hasJoined() { return member != null; } @@ -183,8 +200,7 @@ final class AtomixClusterView extends AbstractCamelClusterView { group.remove(id).join(); member = null; - - fireLeadershipChangedEvent(null); + fireLeadershipChangedEvent(Optional.empty()); } return this; @@ -199,7 +215,7 @@ final class AtomixClusterView extends AbstractCamelClusterView { } } - class AtomixClusterMember implements CamelClusterMember { + final class AtomixClusterMember implements CamelClusterMember { private final GroupMember member; AtomixClusterMember(GroupMember member) { @@ -224,6 +240,11 @@ final class AtomixClusterView extends AbstractCamelClusterView { } @Override + public boolean isLocal() { + return localMember != null ? localMember.is(member) : false; + } + + @Override public String toString() { final StringBuilder sb = new StringBuilder("AtomixClusterMember{"); sb.append("group=").append(group); http://git-wip-us.apache.org/repos/asf/camel/blob/8ef99f5c/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientRoutePolicyTestSupport.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientRoutePolicyTestSupport.java b/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientRoutePolicyTestSupport.java index bf425df..d5b0f8b 100644 --- a/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientRoutePolicyTestSupport.java +++ b/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientRoutePolicyTestSupport.java @@ -81,7 +81,8 @@ public abstract class AtomixClientRoutePolicyTestSupport { private void run(String id) { try { - CountDownLatch contextLatch = new CountDownLatch(1); + int events = ThreadLocalRandom.current().nextInt(2, 6); + CountDownLatch contextLatch = new CountDownLatch(events); DefaultCamelContext context = new DefaultCamelContext(); context.disableJMX(); @@ -91,15 +92,10 @@ public abstract class AtomixClientRoutePolicyTestSupport { context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { - from("timer:atomix?delay=1s&period=1s&repeatCount=1") + from("timer:atomix?delay=1s&period=1s") .routeId("route-" + id) - .process(e -> { - LOGGER.debug("Node {} done", id); - results.add(id); - // Shutdown the context later on to give a chance to - // other members to catch-up - scheduler.schedule(contextLatch::countDown, 2 + ThreadLocalRandom.current().nextInt(3), TimeUnit.SECONDS); - }); + .log("From ${routeId}") + .process(e -> contextLatch.countDown()); } }); @@ -109,6 +105,10 @@ public abstract class AtomixClientRoutePolicyTestSupport { context.start(); contextLatch.await(); + + LOGGER.debug("Shutting down client node {}", id); + results.add(id); + context.stop(); latch.countDown(); http://git-wip-us.apache.org/repos/asf/camel/blob/8ef99f5c/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixRoutePolicyTest.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixRoutePolicyTest.java b/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixRoutePolicyTest.java index 8e23d5d..e730291 100644 --- a/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixRoutePolicyTest.java +++ b/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixRoutePolicyTest.java @@ -47,7 +47,7 @@ public final class AtomixRoutePolicyTest { ); private final Set<Address> results = new HashSet<>(); - private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(addresses.size() * 2); + private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(addresses.size()); private final CountDownLatch latch = new CountDownLatch(addresses.size()); // ************************************ @@ -73,7 +73,8 @@ public final class AtomixRoutePolicyTest { private void run(Address address) { try { - CountDownLatch contextLatch = new CountDownLatch(1); + int events = ThreadLocalRandom.current().nextInt(2, 6); + CountDownLatch contextLatch = new CountDownLatch(events); AtomixClusterService service = new AtomixClusterService(); service.setId("node-" + address.port()); @@ -89,15 +90,10 @@ public final class AtomixRoutePolicyTest { context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { - from("timer:atomix?delay=1s&period=1s&repeatCount=1") + from("timer:atomix?delay=1s&period=1s") .routeId("route-" + address.port()) - .process(e -> { - LOGGER.debug("Node {} done", address); - results.add(address); - // Shutdown the context later on to give a chance to - // other members to catch-up - scheduler.schedule(contextLatch::countDown, 2 + ThreadLocalRandom.current().nextInt(3), TimeUnit.SECONDS); - }); + .log("From ${routeId}") + .process(e -> contextLatch.countDown()); } }); @@ -107,6 +103,10 @@ public final class AtomixRoutePolicyTest { context.start(); contextLatch.await(); + + LOGGER.debug("Shutting down node {}", address); + results.add(address); + context.stop(); latch.countDown(); http://git-wip-us.apache.org/repos/asf/camel/blob/8ef99f5c/components/camel-atomix/src/test/resources/log4j2.properties ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/test/resources/log4j2.properties b/components/camel-atomix/src/test/resources/log4j2.properties index d424d7e..e11e374 100644 --- a/components/camel-atomix/src/test/resources/log4j2.properties +++ b/components/camel-atomix/src/test/resources/log4j2.properties @@ -27,6 +27,10 @@ appender.out.layout.pattern = [%30.30t] %-30.30c{1} %-5p %m%n logger.atomix.name = io.atomix logger.atomix.level = INFO +logger.atomix-copycat.name = io.atomix.copycat +logger.atomix-copycat.level = WARN +logger.atomix-catalyst.name = io.atomix.catalyst +logger.atomix-catalyst.level = WARN logger.camel.name = org.apache.camel logger.camel.level = INFO http://git-wip-us.apache.org/repos/asf/camel/blob/8ef99f5c/components/camel-consul/pom.xml ---------------------------------------------------------------------- diff --git a/components/camel-consul/pom.xml b/components/camel-consul/pom.xml index 914e751..8133b47 100644 --- a/components/camel-consul/pom.xml +++ b/components/camel-consul/pom.xml @@ -163,7 +163,7 @@ <images> <image> <name>consul:latest</name> - <alias>consul</alias> + <alias>0.9.3</alias> <run> <ports> <port>consul.port:8500</port> http://git-wip-us.apache.org/repos/asf/camel/blob/8ef99f5c/components/camel-consul/src/main/java/org/apache/camel/component/consul/ha/ConsulClusterView.java ---------------------------------------------------------------------- diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/ha/ConsulClusterView.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/ha/ConsulClusterView.java index ae84168..c5971461 100644 --- a/components/camel-consul/src/main/java/org/apache/camel/component/consul/ha/ConsulClusterView.java +++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/ha/ConsulClusterView.java @@ -35,6 +35,7 @@ import com.orbitz.consul.model.session.SessionInfo; import com.orbitz.consul.option.QueryOptions; import org.apache.camel.ha.CamelClusterMember; import org.apache.camel.impl.ha.AbstractCamelClusterView; +import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,7 +44,6 @@ final class ConsulClusterView extends AbstractCamelClusterView { private final ConsulClusterConfiguration configuration; private final ConsulLocalMember localMember; - private final ConsulClusterMember nullMember; private final AtomicReference<String> sessionId; private final Watcher watcher; @@ -57,7 +57,6 @@ final class ConsulClusterView extends AbstractCamelClusterView { this.configuration = configuration; this.localMember = new ConsulLocalMember(); - this.nullMember = new ConsulClusterMember(); this.sessionId = new AtomicReference<>(); this.watcher = new Watcher(); this.path = configuration.getRootPath() + "/" + namespace; @@ -126,17 +125,21 @@ final class ConsulClusterView extends AbstractCamelClusterView { LOGGER.debug("Successfully released lock on path '{}' with id '{}'", path, sessionId.get()); } - sessionClient.destroySession(sessionId.getAndSet(null)); - localMember.setMaster(false); + synchronized (sessionId) { + sessionClient.destroySession(sessionId.getAndSet(null)); + localMember.setMaster(false); + } } } private boolean acquireLock() { - String sid = sessionId.get(); + synchronized (sessionId) { + String sid = sessionId.get(); - return (sid != null) - ? keyValueClient.acquireLock(this.path, sid) - : false; + return (sid != null) + ? sessionClient.getSessionInfo(sid).transform(si -> keyValueClient.acquireLock(path, sid)).or(Boolean.FALSE) + : false; + } } // *********************************************** @@ -149,12 +152,12 @@ final class ConsulClusterView extends AbstractCamelClusterView { void setMaster(boolean master) { if (master && this.master.compareAndSet(false, true)) { LOGGER.debug("Leadership taken for session id {}", sessionId.get()); - fireLeadershipChangedEvent(this); + fireLeadershipChangedEvent(Optional.of(this)); return; } if (!master && this.master.compareAndSet(true, false)) { LOGGER.debug("Leadership lost for session id {}", sessionId.get()); - fireLeadershipChangedEvent(getMaster().orElse(nullMember)); + fireLeadershipChangedEvent(getMaster()); return; } } @@ -165,6 +168,11 @@ final class ConsulClusterView extends AbstractCamelClusterView { } @Override + public boolean isLocal() { + return true; + } + + @Override public String getId() { return sessionId.get(); } @@ -210,6 +218,15 @@ final class ConsulClusterView extends AbstractCamelClusterView { } @Override + public boolean isLocal() { + if (id == null) { + return false; + } + + return ObjectHelper.equal(id, localMember.getId()); + } + + @Override public String toString() { return "ConsulClusterMember{" + "id='" + id + '\'' http://git-wip-us.apache.org/repos/asf/camel/blob/8ef99f5c/components/camel-consul/src/test/java/org/apache/camel/component/consul/ha/ConsulClusteredRoutePolicyIT.java ---------------------------------------------------------------------- diff --git a/components/camel-consul/src/test/java/org/apache/camel/component/consul/ha/ConsulClusteredRoutePolicyIT.java b/components/camel-consul/src/test/java/org/apache/camel/component/consul/ha/ConsulClusteredRoutePolicyIT.java index effa2f3..2a79682 100644 --- a/components/camel-consul/src/test/java/org/apache/camel/component/consul/ha/ConsulClusteredRoutePolicyIT.java +++ b/components/camel-consul/src/test/java/org/apache/camel/component/consul/ha/ConsulClusteredRoutePolicyIT.java @@ -67,7 +67,8 @@ public class ConsulClusteredRoutePolicyIT { private static void run(String id) { try { - CountDownLatch contextLatch = new CountDownLatch(1); + int events = ThreadLocalRandom.current().nextInt(2, 6); + CountDownLatch contextLatch = new CountDownLatch(events); ConsulClusterService service = new ConsulClusterService(); service.setId("node-" + id); @@ -83,15 +84,10 @@ public class ConsulClusteredRoutePolicyIT { context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { - from("timer:consul?delay=1s&period=1s&repeatCount=1") + from("timer:consul?delay=1s&period=1s") .routeId("route-" + id) - .process(e -> { - LOGGER.debug("Node {} done", id); - RESULTS.add(id); - // Shutdown the context later on to give a chance to - // other members to catch-up - SCHEDULER.schedule(contextLatch::countDown, 2 + ThreadLocalRandom.current().nextInt(3), TimeUnit.SECONDS); - }); + .log("From ${routeId}") + .process(e -> contextLatch.countDown()); } }); @@ -101,6 +97,10 @@ public class ConsulClusteredRoutePolicyIT { context.start(); contextLatch.await(); + + LOGGER.debug("Shutting down node {}", id); + RESULTS.add(id); + context.stop(); LATCH.countDown(); http://git-wip-us.apache.org/repos/asf/camel/blob/8ef99f5c/components/camel-consul/src/test/resources/log4j2.properties ---------------------------------------------------------------------- diff --git a/components/camel-consul/src/test/resources/log4j2.properties b/components/camel-consul/src/test/resources/log4j2.properties index c2510bc..a4da2bf 100644 --- a/components/camel-consul/src/test/resources/log4j2.properties +++ b/components/camel-consul/src/test/resources/log4j2.properties @@ -24,9 +24,18 @@ appender.out.type = Console appender.out.name = out appender.out.layout.type = PatternLayout appender.out.layout.pattern = [%30.30t] %-30.30c{1} %-5p %m%n -logger.consul.name = org.apache.camel.component.consul -logger.consul.level = DEBUG +logger.atomix.name = io.atomix +logger.atomix.level = INFO +logger.atomix-copycat.name = io.atomix.copycat +logger.atomix-copycat.level = WARN +logger.atomix-catalyst.name = io.atomix.catalyst +logger.atomix-catalyst.level = WARN + +logger.camel.name = org.apache.camel +logger.camel.level = INFO +logger.camel-consul.name = org.apache.camel.component.consul +logger.camel-consul.level = DEBUG logger.camel-ha.name = org.apache.camel.ha logger.camel-ha.level = DEBUG logger.camel-impl-ha.name = org.apache.camel.impl.ha @@ -34,4 +43,5 @@ logger.camel-impl-ha.level = DEBUG rootLogger.level = INFO #rootLogger.appenderRef.stdout.ref = out -rootLogger.appenderRef.out.ref = file \ No newline at end of file +rootLogger.appenderRef.out.ref = file + http://git-wip-us.apache.org/repos/asf/camel/blob/8ef99f5c/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterView.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterView.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterView.java index 148f47d..23b4305 100644 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterView.java +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterView.java @@ -95,7 +95,7 @@ public class KubernetesClusterView extends AbstractCamelClusterView { // New leader Optional<String> leader = KubernetesClusterEvent.KubernetesClusterLeaderChangedEvent.class.cast(event).getData(); currentLeader = leader.map(this::toMember); - fireLeadershipChangedEvent(currentLeader.orElse(null)); + fireLeadershipChangedEvent(currentLeader); } else if (event instanceof KubernetesClusterEvent.KubernetesClusterMemberListChangedEvent) { Set<String> members = KubernetesClusterEvent.KubernetesClusterMemberListChangedEvent.class.cast(event).getData(); Set<String> oldMembers = currentMembers.stream().map(CamelClusterMember::getId).collect(Collectors.toSet()); @@ -153,11 +153,15 @@ public class KubernetesClusterView extends AbstractCamelClusterView { } @Override + public boolean isLocal() { + return ObjectHelper.equal(lockConfiguration.getPodName(), podName); + } + + @Override public String getId() { return podName; } - @Override public String toString() { final StringBuilder sb = new StringBuilder("KubernetesClusterMember{"); http://git-wip-us.apache.org/repos/asf/camel/blob/8ef99f5c/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLockConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLockConfiguration.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLockConfiguration.java index 69c54d9..94dcca8 100644 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLockConfiguration.java +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLockConfiguration.java @@ -44,7 +44,7 @@ public class KubernetesLockConfiguration implements Cloneable { private String configMapName = DEFAULT_CONFIGMAP_NAME; /** - * Name of the lock group (or namespace according to the Camel cluster convention) within the chosen ConfgMap. + * Name of the lock group (or namespace according to the Camel cluster convention) within the chosen ConfigMap. */ private String groupName; http://git-wip-us.apache.org/repos/asf/camel/blob/8ef99f5c/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/LeaderRecorder.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/LeaderRecorder.java b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/LeaderRecorder.java index 7d7147b..0f8199d 100644 --- a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/LeaderRecorder.java +++ b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/LeaderRecorder.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; import java.util.function.Predicate; @@ -41,9 +42,9 @@ public class LeaderRecorder implements CamelClusterEventListener.Leadership { private List<LeadershipInfo> leaderships = new CopyOnWriteArrayList<>(); @Override - public void leadershipChanged(CamelClusterView view, CamelClusterMember leader) { + public void leadershipChanged(CamelClusterView view, Optional<CamelClusterMember> leader) { LOG.info("Cluster view {} - leader changed to: {}", view.getLocalMember(), leader); - this.leaderships.add(new LeadershipInfo(leader != null ? leader.getId() : null, System.currentTimeMillis())); + this.leaderships.add(new LeadershipInfo(leader.map(CamelClusterMember::getId).orElse(null), System.currentTimeMillis())); } public List<LeadershipInfo> getLeadershipInfo() { http://git-wip-us.apache.org/repos/asf/camel/blob/8ef99f5c/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusterView.java ---------------------------------------------------------------------- diff --git a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusterView.java b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusterView.java index 6310ded..78ceed5 100644 --- a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusterView.java +++ b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusterView.java @@ -19,7 +19,6 @@ package org.apache.camel.component.zookeeper.ha; import java.util.Collections; import java.util.List; import java.util.Optional; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import org.apache.camel.RuntimeCamelException; @@ -27,6 +26,7 @@ import org.apache.camel.component.zookeeper.ZooKeeperCuratorConfiguration; import org.apache.camel.ha.CamelClusterMember; import org.apache.camel.ha.CamelClusterService; import org.apache.camel.impl.ha.AbstractCamelClusterView; +import org.apache.camel.util.ObjectHelper; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.leader.LeaderSelector; import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter; @@ -40,7 +40,7 @@ final class ZooKeeperClusterView extends AbstractCamelClusterView { private final ZooKeeperCuratorConfiguration configuration; private final CuratorFramework client; private final CuratorLocalMember localMember; - private LeaderSelector leaderSelector; + private volatile LeaderSelector leaderSelector; public ZooKeeperClusterView(CamelClusterService cluster, ZooKeeperCuratorConfiguration configuration, CuratorFramework client, String namespace) { super(cluster, namespace); @@ -57,12 +57,16 @@ final class ZooKeeperClusterView extends AbstractCamelClusterView { @Override public Optional<CamelClusterMember> getMaster() { - if (leaderSelector == null) { + if (leaderSelector == null || isStoppingOrStopped()) { return Optional.empty(); } try { - return Optional.of(new CuratorClusterMember(leaderSelector.getLeader())); + Participant participant = leaderSelector.getLeader(); + + return ObjectHelper.equal(participant.getId(), localMember.getId()) + ? Optional.of(localMember) + : Optional.of(new CuratorClusterMember(participant)); } catch (Exception e) { throw new RuntimeCamelException(e); } @@ -97,19 +101,27 @@ final class ZooKeeperClusterView extends AbstractCamelClusterView { protected void doStop() throws Exception { if (leaderSelector != null) { leaderSelector.interruptLeadership(); - leaderSelector.close(); - leaderSelector = null; + fireLeadershipChangedEvent(getMaster()); + } else { + leaderSelector.requeue(); + } + } - localMember.setMaster(false); - fireLeadershipChangedEvent(getMaster().orElse(null)); + @Override + protected void doShutdown() throws Exception { + if (leaderSelector != null) { + leaderSelector.close(); } } - class CamelLeaderElectionListener extends LeaderSelectorListenerAdapter { + // *********************************************** + // + // *********************************************** + + private final class CamelLeaderElectionListener extends LeaderSelectorListenerAdapter { @Override public void takeLeadership(CuratorFramework curatorFramework) throws Exception { - localMember.setMaster(true); - fireLeadershipChangedEvent(localMember); + fireLeadershipChangedEvent(Optional.of(localMember)); while (isRunAllowed()) { try { @@ -119,26 +131,20 @@ final class ZooKeeperClusterView extends AbstractCamelClusterView { break; } } - - localMember.setMaster(false); - fireLeadershipChangedEvent(getMaster().orElse(null)); + + fireLeadershipChangedEvent(getMaster()); } } - // *********************************************** - // - // *********************************************** - private final class CuratorLocalMember implements CamelClusterMember { - private AtomicBoolean master = new AtomicBoolean(false); - - void setMaster(boolean master) { - this.master.set(master); + @Override + public boolean isLeader() { + return leaderSelector != null ? leaderSelector.hasLeadership() : false; } @Override - public boolean isLeader() { - return master.get(); + public boolean isLocal() { + return true; } @Override @@ -160,6 +166,13 @@ final class ZooKeeperClusterView extends AbstractCamelClusterView { } @Override + public boolean isLocal() { + return (participant.getId() != null) + ? ObjectHelper.equal(participant.getId(), localMember.getId()) + : false; + } + + @Override public boolean isLeader() { try { return leaderSelector.getLeader().equals(this.participant); http://git-wip-us.apache.org/repos/asf/camel/blob/8ef99f5c/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusteredRoutePolicyTest.java ---------------------------------------------------------------------- diff --git a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusteredRoutePolicyTest.java b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusteredRoutePolicyTest.java index 1c32787..6aa7469 100644 --- a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusteredRoutePolicyTest.java +++ b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusteredRoutePolicyTest.java @@ -79,7 +79,8 @@ public final class ZooKeeperClusteredRoutePolicyTest { private static void run(String id) { try { - CountDownLatch contextLatch = new CountDownLatch(1); + int events = ThreadLocalRandom.current().nextInt(2, 6); + CountDownLatch contextLatch = new CountDownLatch(events); ZooKeeperClusterService service = new ZooKeeperClusterService(); service.setId("node-" + id); @@ -94,15 +95,10 @@ public final class ZooKeeperClusteredRoutePolicyTest { context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { - from("timer:zookeeper?delay=1s&period=1s&repeatCount=1") + from("timer:zookeeper?delay=1s&period=1s") .routeId("route-" + id) - .process(e -> { - LOGGER.debug("Node {} done", id); - RESULTS.add(id); - // Shutdown the context later on to give a chance to - // other members to catch-up - SCHEDULER.schedule(contextLatch::countDown, 2 + ThreadLocalRandom.current().nextInt(3), TimeUnit.SECONDS); - }); + .log("From ${routeId}") + .process(e -> contextLatch.countDown()); } }); @@ -112,6 +108,10 @@ public final class ZooKeeperClusteredRoutePolicyTest { context.start(); contextLatch.await(); + + LOGGER.debug("Shutting down node {}", id); + RESULTS.add(id); + context.stop(); LATCH.countDown(); http://git-wip-us.apache.org/repos/asf/camel/blob/8ef99f5c/components/camel-zookeeper/src/test/resources/log4j2.properties ---------------------------------------------------------------------- diff --git a/components/camel-zookeeper/src/test/resources/log4j2.properties b/components/camel-zookeeper/src/test/resources/log4j2.properties index bab98a6..52dfed8 100644 --- a/components/camel-zookeeper/src/test/resources/log4j2.properties +++ b/components/camel-zookeeper/src/test/resources/log4j2.properties @@ -28,16 +28,19 @@ appender.out.layout.pattern = [%t] %c{1} %-5p %m%n logger.zookeeper.name = org.apache.zookeeper logger.zookeeper.level = INFO + +logger.camel.name = org.apache.camel +logger.camel.level = INFO logger.camel-zookeeper.name = org.apache.camel.component.zookeeper logger.camel-zookeeper.level = INFO logger.camel-zookeeper-policy.name = org.apache.camel.component.zookeeper.policy logger.camel-zookeeper-policy.level = INFO +logger.camel-ha.name = org.apache.camel.ha +logger.camel-ha.level = DEBUG +logger.camel-impl-ha.name = org.apache.camel.impl.ha +logger.camel-impl-ha.level = DEBUG logger.camel-support.name = org.apache.camel.support logger.camel-support.level = INFO -logger.camel-ha-impl.name = org.apache.camel.impl.ha -logger.camel-ha-impl.level = DEBUG -logger.camel.name = org.apache.camel -logger.camel.level = INFO logger.springframework.name = org.springframework logger.springframework.level = WARN @@ -46,4 +49,3 @@ rootLogger.level = INFO #rootLogger.appenderRef.stdout.ref = out rootLogger.appenderRef.file.ref = file - http://git-wip-us.apache.org/repos/asf/camel/blob/8ef99f5c/platforms/spring-boot/components-starter/camel-atomix-starter/src/main/java/org/apache/camel/component/atomix/ha/springboot/AtomixClusterServiceAutoConfiguration.java ---------------------------------------------------------------------- diff --git a/platforms/spring-boot/components-starter/camel-atomix-starter/src/main/java/org/apache/camel/component/atomix/ha/springboot/AtomixClusterServiceAutoConfiguration.java b/platforms/spring-boot/components-starter/camel-atomix-starter/src/main/java/org/apache/camel/component/atomix/ha/springboot/AtomixClusterServiceAutoConfiguration.java index a6e1e6b..7d7c6cd 100644 --- a/platforms/spring-boot/components-starter/camel-atomix-starter/src/main/java/org/apache/camel/component/atomix/ha/springboot/AtomixClusterServiceAutoConfiguration.java +++ b/platforms/spring-boot/components-starter/camel-atomix-starter/src/main/java/org/apache/camel/component/atomix/ha/springboot/AtomixClusterServiceAutoConfiguration.java @@ -47,7 +47,7 @@ public class AtomixClusterServiceAutoConfiguration { @Bean(initMethod = "start", destroyMethod = "stop") @Scope(ConfigurableBeanFactory.SCOPE_SINGLETON) - @ConditionalOnProperty(prefix = "camel.clustered.service.atomix", name = "mode", havingValue = "node") + @ConditionalOnProperty(prefix = "camel.component.atomix.cluster.service", name = "mode", havingValue = "node") public CamelClusterService atomixClusterService() { AtomixClusterService service = new AtomixClusterService(); service.setNodes(configuration.getNodes().stream().map(Address::new).collect(Collectors.toList())); @@ -64,7 +64,7 @@ public class AtomixClusterServiceAutoConfiguration { @Bean(initMethod = "start", destroyMethod = "stop") @Scope(ConfigurableBeanFactory.SCOPE_SINGLETON) - @ConditionalOnProperty(prefix = "camel.clustered.service.atomix", name = "mode", havingValue = "client") + @ConditionalOnProperty(prefix = "camel.component.atomix.cluster.service", name = "mode", havingValue = "client") public CamelClusterService atomixClusterClientService() { AtomixClusterClientService service = new AtomixClusterClientService(); service.setNodes(configuration.getNodes().stream().map(Address::new).collect(Collectors.toList())); @@ -84,11 +84,11 @@ public class AtomixClusterServiceAutoConfiguration { super(ConfigurationPhase.REGISTER_BEAN); } - @ConditionalOnProperty(prefix = "camel.clustered.service.atomix", name = "enabled") + @ConditionalOnProperty(prefix = "camel.component.atomix.cluster.service", name = "enabled") static class IfEnabled { } - @ConditionalOnProperty(prefix = "camel.clustered.service.atomix", name = "mode") + @ConditionalOnProperty(prefix = "camel.component.atomix.cluster.service", name = "mode") static class WithMode { } } http://git-wip-us.apache.org/repos/asf/camel/blob/8ef99f5c/platforms/spring-boot/components-starter/camel-atomix-starter/src/main/java/org/apache/camel/component/atomix/ha/springboot/AtomixClusterServiceConfiguration.java ---------------------------------------------------------------------- diff --git a/platforms/spring-boot/components-starter/camel-atomix-starter/src/main/java/org/apache/camel/component/atomix/ha/springboot/AtomixClusterServiceConfiguration.java b/platforms/spring-boot/components-starter/camel-atomix-starter/src/main/java/org/apache/camel/component/atomix/ha/springboot/AtomixClusterServiceConfiguration.java index 27b281a..8317d56 100644 --- a/platforms/spring-boot/components-starter/camel-atomix-starter/src/main/java/org/apache/camel/component/atomix/ha/springboot/AtomixClusterServiceConfiguration.java +++ b/platforms/spring-boot/components-starter/camel-atomix-starter/src/main/java/org/apache/camel/component/atomix/ha/springboot/AtomixClusterServiceConfiguration.java @@ -22,7 +22,7 @@ import java.util.Set; import io.atomix.copycat.server.storage.StorageLevel; import org.springframework.boot.context.properties.ConfigurationProperties; -@ConfigurationProperties(prefix = "camel.clustered.service.atomix") +@ConfigurationProperties(prefix = "camel.component.atomix.cluster.service") public class AtomixClusterServiceConfiguration { enum Mode { node, http://git-wip-us.apache.org/repos/asf/camel/blob/8ef99f5c/platforms/spring-boot/components-starter/camel-consul-starter/src/main/java/org/apache/camel/component/consul/springboot/ha/ConsulClusterServiceAutoConfiguration.java ---------------------------------------------------------------------- diff --git a/platforms/spring-boot/components-starter/camel-consul-starter/src/main/java/org/apache/camel/component/consul/springboot/ha/ConsulClusterServiceAutoConfiguration.java b/platforms/spring-boot/components-starter/camel-consul-starter/src/main/java/org/apache/camel/component/consul/springboot/ha/ConsulClusterServiceAutoConfiguration.java index bd0dd8c..082bb7b 100644 --- a/platforms/spring-boot/components-starter/camel-consul-starter/src/main/java/org/apache/camel/component/consul/springboot/ha/ConsulClusterServiceAutoConfiguration.java +++ b/platforms/spring-boot/components-starter/camel-consul-starter/src/main/java/org/apache/camel/component/consul/springboot/ha/ConsulClusterServiceAutoConfiguration.java @@ -33,7 +33,7 @@ import org.springframework.context.annotation.Scope; @Configuration @AutoConfigureBefore({ ClusteredRouteControllerAutoConfiguration.class, CamelAutoConfiguration.class }) -@ConditionalOnProperty(prefix = "camel.clustered.service.consul", name = "enabled") +@ConditionalOnProperty(prefix = "camel.component.consul.cluster.service", name = "enabled") @EnableConfigurationProperties(ConsulClusterServiceConfiguration.class) public class ConsulClusterServiceAutoConfiguration { @Autowired http://git-wip-us.apache.org/repos/asf/camel/blob/8ef99f5c/platforms/spring-boot/components-starter/camel-consul-starter/src/main/java/org/apache/camel/component/consul/springboot/ha/ConsulClusterServiceConfiguration.java ---------------------------------------------------------------------- diff --git a/platforms/spring-boot/components-starter/camel-consul-starter/src/main/java/org/apache/camel/component/consul/springboot/ha/ConsulClusterServiceConfiguration.java b/platforms/spring-boot/components-starter/camel-consul-starter/src/main/java/org/apache/camel/component/consul/springboot/ha/ConsulClusterServiceConfiguration.java index d47aa71..bc619c2 100644 --- a/platforms/spring-boot/components-starter/camel-consul-starter/src/main/java/org/apache/camel/component/consul/springboot/ha/ConsulClusterServiceConfiguration.java +++ b/platforms/spring-boot/components-starter/camel-consul-starter/src/main/java/org/apache/camel/component/consul/springboot/ha/ConsulClusterServiceConfiguration.java @@ -19,7 +19,7 @@ package org.apache.camel.component.consul.springboot.ha; import org.apache.camel.component.consul.ha.ConsulClusterConfiguration; import org.springframework.boot.context.properties.ConfigurationProperties; -@ConfigurationProperties(prefix = "camel.clustered.service.consul") +@ConfigurationProperties(prefix = "camel.component.consul.cluster.service") public class ConsulClusterServiceConfiguration extends ConsulClusterConfiguration { /** * Sets if the zookeeper cluster service should be enabled or not, default is false. http://git-wip-us.apache.org/repos/asf/camel/blob/8ef99f5c/platforms/spring-boot/components-starter/camel-zookeeper-starter/src/main/java/org/apache/camel/component/zookeeper/ha/springboot/ZooKeeperClusterServiceAutoConfiguration.java ---------------------------------------------------------------------- diff --git a/platforms/spring-boot/components-starter/camel-zookeeper-starter/src/main/java/org/apache/camel/component/zookeeper/ha/springboot/ZooKeeperClusterServiceAutoConfiguration.java b/platforms/spring-boot/components-starter/camel-zookeeper-starter/src/main/java/org/apache/camel/component/zookeeper/ha/springboot/ZooKeeperClusterServiceAutoConfiguration.java index 9cb27a0..de80367 100644 --- a/platforms/spring-boot/components-starter/camel-zookeeper-starter/src/main/java/org/apache/camel/component/zookeeper/ha/springboot/ZooKeeperClusterServiceAutoConfiguration.java +++ b/platforms/spring-boot/components-starter/camel-zookeeper-starter/src/main/java/org/apache/camel/component/zookeeper/ha/springboot/ZooKeeperClusterServiceAutoConfiguration.java @@ -33,7 +33,7 @@ import org.springframework.context.annotation.Scope; @Configuration @AutoConfigureBefore({ ClusteredRouteControllerAutoConfiguration.class, CamelAutoConfiguration.class }) -@ConditionalOnProperty(prefix = "camel.clustered.service.zookeeper", name = "enabled") +@ConditionalOnProperty(prefix = "camel.component.zookeeper.cluster.service", name = "enabled") @EnableConfigurationProperties(ZooKeeperClusterServiceConfiguration.class) public class ZooKeeperClusterServiceAutoConfiguration { @Autowired http://git-wip-us.apache.org/repos/asf/camel/blob/8ef99f5c/platforms/spring-boot/components-starter/camel-zookeeper-starter/src/main/java/org/apache/camel/component/zookeeper/ha/springboot/ZooKeeperClusterServiceConfiguration.java ---------------------------------------------------------------------- diff --git a/platforms/spring-boot/components-starter/camel-zookeeper-starter/src/main/java/org/apache/camel/component/zookeeper/ha/springboot/ZooKeeperClusterServiceConfiguration.java b/platforms/spring-boot/components-starter/camel-zookeeper-starter/src/main/java/org/apache/camel/component/zookeeper/ha/springboot/ZooKeeperClusterServiceConfiguration.java index 620ec0e..cb66058 100644 --- a/platforms/spring-boot/components-starter/camel-zookeeper-starter/src/main/java/org/apache/camel/component/zookeeper/ha/springboot/ZooKeeperClusterServiceConfiguration.java +++ b/platforms/spring-boot/components-starter/camel-zookeeper-starter/src/main/java/org/apache/camel/component/zookeeper/ha/springboot/ZooKeeperClusterServiceConfiguration.java @@ -19,7 +19,7 @@ package org.apache.camel.component.zookeeper.ha.springboot; import org.apache.camel.component.zookeeper.ZooKeeperCuratorConfiguration; import org.springframework.boot.context.properties.ConfigurationProperties; -@ConfigurationProperties(prefix = "camel.clustered.service.zookeeper") +@ConfigurationProperties(prefix = "camel.component.zookeeper.cluster.service") public class ZooKeeperClusterServiceConfiguration extends ZooKeeperCuratorConfiguration { /** * Sets if the zookeeper cluster service should be enabled or not, default is false.