CAMEL-11899: cluster-service : fire event on listener registration
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/6682a356 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/6682a356 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/6682a356 Branch: refs/heads/master Commit: 6682a35622ec3b7546f3337bce0255564b641334 Parents: a232ab0 Author: lburgazzoli <lburgazz...@gmail.com> Authored: Thu Oct 12 17:10:39 2017 +0200 Committer: lburgazzoli <lburgazz...@gmail.com> Committed: Thu Oct 12 18:54:08 2017 +0200 ---------------------------------------------------------------------- .../camel/impl/ha/AbstractCamelClusterView.java | 32 +++++++++++++++++++- .../camel/impl/ha/ClusteredRoutePolicy.java | 1 - .../camel/impl/ha/ClusterServiceViewTest.java | 32 ++++++++++++++++++++ components/camel-consul/pom.xml | 2 +- .../camel/component/master/MasterConsumer.java | 4 --- .../zookeeper/ha/ZooKeeperClusterView.java | 7 +++++ .../src/test/resources/log4j2.properties | 4 ++- 7 files changed, 74 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/6682a356/camel-core/src/main/java/org/apache/camel/impl/ha/AbstractCamelClusterView.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/ha/AbstractCamelClusterView.java b/camel-core/src/main/java/org/apache/camel/impl/ha/AbstractCamelClusterView.java index de91976..a6d3e25 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/ha/AbstractCamelClusterView.java +++ b/camel-core/src/main/java/org/apache/camel/impl/ha/AbstractCamelClusterView.java @@ -66,11 +66,41 @@ public abstract class AbstractCamelClusterView extends ServiceSupport implements @Override public void addEventListener(CamelClusterEventListener listener) { - LockHelper.doWithWriteLock(lock, () -> listeners.add(listener)); + if (listener == null) { + return; + } + + LockHelper.doWithWriteLock( + lock, + () -> { + listeners.add(listener); + + if (isRunAllowed()) { + // if the view has already been started, fire known events so + // the consumer can catch up. + + if (CamelClusterEventListener.Leadership.class.isInstance(listener)) { + CamelClusterEventListener.Leadership.class.cast(listener).leadershipChanged(this, getMaster()); + } + + if (CamelClusterEventListener.Membership.class.isInstance(listener)) { + CamelClusterEventListener.Membership ml = CamelClusterEventListener.Membership.class.cast(listener); + + for (CamelClusterMember member: getMembers()) { + ml.memberAdded(this, member); + } + } + } + } + ); } @Override public void removeEventListener(CamelClusterEventListener listener) { + if (listener == null) { + return; + } + LockHelper.doWithWriteLock(lock, () -> listeners.removeIf(l -> l == listener)); } http://git-wip-us.apache.org/repos/asf/camel/blob/6682a356/camel-core/src/main/java/org/apache/camel/impl/ha/ClusteredRoutePolicy.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/ha/ClusteredRoutePolicy.java b/camel-core/src/main/java/org/apache/camel/impl/ha/ClusteredRoutePolicy.java index b537ca7..a733394 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/ha/ClusteredRoutePolicy.java +++ b/camel-core/src/main/java/org/apache/camel/impl/ha/ClusteredRoutePolicy.java @@ -283,7 +283,6 @@ public final class ClusteredRoutePolicy extends RoutePolicySupport implements Ca ); clusterView.addEventListener(leadershipEventListener); - setLeader(clusterView.getLocalMember().isLeader()); } // **************************************************** http://git-wip-us.apache.org/repos/asf/camel/blob/6682a356/camel-core/src/test/java/org/apache/camel/impl/ha/ClusterServiceViewTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/impl/ha/ClusterServiceViewTest.java b/camel-core/src/test/java/org/apache/camel/impl/ha/ClusterServiceViewTest.java index 0948719..2a6cddd 100644 --- a/camel-core/src/test/java/org/apache/camel/impl/ha/ClusterServiceViewTest.java +++ b/camel-core/src/test/java/org/apache/camel/impl/ha/ClusterServiceViewTest.java @@ -146,6 +146,38 @@ public class ClusterServiceViewTest { ); } + @Test + public void testLateViewListeners() throws Exception { + final TestClusterService service = new TestClusterService(UUID.randomUUID().toString()); + final TestClusterView view = service.getView("ns1").unwrap(TestClusterView.class); + final int events = 1 + new Random().nextInt(10); + final Set<Integer> results = new HashSet<>(); + final CountDownLatch latch = new CountDownLatch(events * 2); + + IntStream.range(0, events).forEach( + i -> view.addEventListener((CamelClusterEventListener.Leadership) (v, l) -> { + results.add(i); + latch.countDown(); + }) + ); + + service.start(); + view.setLeader(true); + + IntStream.range(events, events * 2).forEach( + i -> view.addEventListener((CamelClusterEventListener.Leadership) (v, l) -> { + results.add(i); + latch.countDown(); + }) + ); + + latch.await(10, TimeUnit.SECONDS); + + IntStream.range(0, events * 2).forEach( + i -> Assert.assertTrue(results.contains(i)) + ); + } + // ********************************* // Helpers // ********************************* http://git-wip-us.apache.org/repos/asf/camel/blob/6682a356/components/camel-consul/pom.xml ---------------------------------------------------------------------- diff --git a/components/camel-consul/pom.xml b/components/camel-consul/pom.xml index f84e750..9d92d85 100644 --- a/components/camel-consul/pom.xml +++ b/components/camel-consul/pom.xml @@ -174,7 +174,7 @@ <port>consul.port:8500</port> </ports> <wait> - <log>agent: Synced service 'consul'</log> + <log>agent: Synced node info</log> <time>20000</time> </wait> <cmd> http://git-wip-us.apache.org/repos/asf/camel/blob/6682a356/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java b/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java index c7edd91..8d3815f 100644 --- a/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java +++ b/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java @@ -69,10 +69,6 @@ public class MasterConsumer extends DefaultConsumer { view = clusterService.getView(masterEndpoint.getNamespace()); view.addEventListener(leadershipListener); - - if (isMaster()) { - onLeadershipTaken(); - } } @Override http://git-wip-us.apache.org/repos/asf/camel/blob/6682a356/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusterView.java ---------------------------------------------------------------------- diff --git a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusterView.java b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusterView.java index 6dab19e..3d313ab 100644 --- a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusterView.java +++ b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusterView.java @@ -31,6 +31,7 @@ import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.leader.LeaderSelector; import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter; import org.apache.curator.framework.recipes.leader.Participant; +import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,6 +68,9 @@ final class ZooKeeperClusterView extends AbstractCamelClusterView { return ObjectHelper.equal(participant.getId(), localMember.getId()) ? Optional.of(localMember) : Optional.of(new CuratorClusterMember(participant)); + } catch (KeeperException.NoNodeException e) { + LOGGER.debug("Failed to get get master because node '{}' does not yet exist (error: '{}')", configuration.getBasePath(), e.getMessage()); + return Optional.empty(); } catch (Exception e) { throw new RuntimeCamelException(e); } @@ -83,6 +87,9 @@ final class ZooKeeperClusterView extends AbstractCamelClusterView { .stream() .map(CuratorClusterMember::new) .collect(Collectors.toList()); + } catch (KeeperException.NoNodeException e) { + LOGGER.debug("Failed to get members because node '{}' does not yet exist (error: '{}')", configuration.getBasePath(), e.getMessage()); + return Collections.emptyList(); } catch (Exception e) { throw new RuntimeCamelException(e); } http://git-wip-us.apache.org/repos/asf/camel/blob/6682a356/components/camel-zookeeper/src/test/resources/log4j2.properties ---------------------------------------------------------------------- diff --git a/components/camel-zookeeper/src/test/resources/log4j2.properties b/components/camel-zookeeper/src/test/resources/log4j2.properties index 1dda6f0..5318521 100644 --- a/components/camel-zookeeper/src/test/resources/log4j2.properties +++ b/components/camel-zookeeper/src/test/resources/log4j2.properties @@ -35,6 +35,8 @@ logger.camel-zookeeper.name = org.apache.camel.component.zookeeper logger.camel-zookeeper.level = INFO logger.camel-zookeeper-policy.name = org.apache.camel.component.zookeeper.policy logger.camel-zookeeper-policy.level = INFO +logger.camel-zookeeper-ha.name = org.apache.camel.component.zookeeper.ha +logger.camel-zookeeper-ha.level = DEBUG logger.camel-ha.name = org.apache.camel.ha logger.camel-ha.level = DEBUG logger.camel-impl-ha.name = org.apache.camel.impl.ha @@ -46,5 +48,5 @@ logger.springframework.name = org.springframework logger.springframework.level = WARN rootLogger.level = INFO -#rootLogger.appenderRef.stdout.ref = out +//rootLogger.appenderRef.stdout.ref = out rootLogger.appenderRef.file.ref = file