Repository: camel Updated Branches: refs/heads/camel-2.20.x 72fe78719 -> 54b34c970
CAMEL-11900: cluster-service : only the first event listener is notified about cluster events Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/54b34c97 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/54b34c97 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/54b34c97 Branch: refs/heads/camel-2.20.x Commit: 54b34c9708c6ffac5c4eb45e42372881ddd48b94 Parents: 72fe787 Author: lburgazzoli <lburgazz...@gmail.com> Authored: Thu Oct 12 14:59:01 2017 +0200 Committer: lburgazzoli <lburgazz...@gmail.com> Committed: Thu Oct 12 15:12:38 2017 +0200 ---------------------------------------------------------------------- .../camel/impl/ha/AbstractCamelClusterView.java | 2 +- .../camel/impl/ha/ClusterServiceViewTest.java | 80 ++++++++++++++++++-- 2 files changed, 75 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/54b34c97/camel-core/src/main/java/org/apache/camel/impl/ha/AbstractCamelClusterView.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/ha/AbstractCamelClusterView.java b/camel-core/src/main/java/org/apache/camel/impl/ha/AbstractCamelClusterView.java index 8a060c6..de91976 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/ha/AbstractCamelClusterView.java +++ b/camel-core/src/main/java/org/apache/camel/impl/ha/AbstractCamelClusterView.java @@ -83,7 +83,7 @@ public abstract class AbstractCamelClusterView extends ServiceSupport implements lock, () -> { for (int i = 0; i < listeners.size(); i++) { - CamelClusterEventListener listener = listeners.get(0); + CamelClusterEventListener listener = listeners.get(i); if (type.isInstance(listener)) { consumer.accept(type.cast(listener)); http://git-wip-us.apache.org/repos/asf/camel/blob/54b34c97/camel-core/src/test/java/org/apache/camel/impl/ha/ClusterServiceViewTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/impl/ha/ClusterServiceViewTest.java b/camel-core/src/test/java/org/apache/camel/impl/ha/ClusterServiceViewTest.java index cc6090e..0948719 100644 --- a/camel-core/src/test/java/org/apache/camel/impl/ha/ClusterServiceViewTest.java +++ b/camel-core/src/test/java/org/apache/camel/impl/ha/ClusterServiceViewTest.java @@ -16,10 +16,19 @@ */ package org.apache.camel.impl.ha; +import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Optional; +import java.util.Random; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; import org.apache.camel.ServiceStatus; +import org.apache.camel.ha.CamelClusterEventListener; import org.apache.camel.ha.CamelClusterMember; import org.apache.camel.ha.CamelClusterService; import org.junit.Assert; @@ -29,7 +38,7 @@ public class ClusterServiceViewTest { @Test public void testViewEquality() throws Exception { - TestClusterService service = new TestClusterService(); + TestClusterService service = new TestClusterService(UUID.randomUUID().toString()); TestClusterView view1 = service.getView("ns1").unwrap(TestClusterView.class); TestClusterView view2 = service.getView("ns1").unwrap(TestClusterView.class); TestClusterView view3 = service.getView("ns2").unwrap(TestClusterView.class); @@ -40,7 +49,7 @@ public class ClusterServiceViewTest { @Test public void testViewReferences() throws Exception { - TestClusterService service = new TestClusterService(); + TestClusterService service = new TestClusterService(UUID.randomUUID().toString()); service.start(); TestClusterView view1 = service.getView("ns1").unwrap(TestClusterView.class); @@ -87,7 +96,7 @@ public class ClusterServiceViewTest { @Test public void testViewForceOperations() throws Exception { - TestClusterService service = new TestClusterService(); + TestClusterService service = new TestClusterService(UUID.randomUUID().toString()); TestClusterView view = service.getView("ns1").unwrap(TestClusterView.class); Assert.assertEquals(ServiceStatus.Stopped, view.getStatus()); @@ -112,11 +121,37 @@ public class ClusterServiceViewTest { Assert.assertEquals(ServiceStatus.Stopped, view.getStatus()); } + @Test + public void testMultipleViewListeners() throws Exception { + final TestClusterService service = new TestClusterService(UUID.randomUUID().toString()); + final TestClusterView view = service.getView("ns1").unwrap(TestClusterView.class); + final int events = 1 + new Random().nextInt(10); + final Set<Integer> results = new HashSet<>(); + final CountDownLatch latch = new CountDownLatch(events); + + IntStream.range(0, events).forEach( + i -> view.addEventListener((CamelClusterEventListener.Leadership) (v, l) -> { + results.add(i); + latch.countDown(); + }) + ); + + service.start(); + view.setLeader(true); + + latch.await(10, TimeUnit.SECONDS); + + IntStream.range(0, events).forEach( + i -> Assert.assertTrue(results.contains(i)) + ); + } + // ********************************* // Helpers // ********************************* private static class TestClusterView extends AbstractCamelClusterView { + private boolean leader; public TestClusterView(CamelClusterService cluster, String namespace) { super(cluster, namespace); @@ -124,17 +159,34 @@ public class ClusterServiceViewTest { @Override public Optional<CamelClusterMember> getMaster() { - return null; + return leader + ? Optional.of(getLocalMember()) + : Optional.empty(); } @Override public CamelClusterMember getLocalMember() { - return null; + return new CamelClusterMember() { + @Override + public boolean isLeader() { + return leader; + } + + @Override + public boolean isLocal() { + return true; + } + + @Override + public String getId() { + return getClusterService().getId(); + } + }; } @Override public List<CamelClusterMember> getMembers() { - return null; + return Collections.emptyList(); } @Override @@ -144,9 +196,25 @@ public class ClusterServiceViewTest { @Override protected void doStop() throws Exception { } + + public boolean isLeader() { + return leader; + } + + public void setLeader(boolean leader) { + this.leader = leader; + + if (isRunAllowed()) { + fireLeadershipChangedEvent(getMaster()); + } + } } private static class TestClusterService extends AbstractCamelClusterService<TestClusterView> { + public TestClusterService(String id) { + super(id); + } + @Override protected TestClusterView createView(String namespace) throws Exception { return new TestClusterView(this, namespace);