Repository: camel Updated Branches: refs/heads/master 0c90ebdc4 -> ea4a722bc
CAMEL-11800: cluster service : there should be an option to leave a cluster view Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/96a7cfdd Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/96a7cfdd Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/96a7cfdd Branch: refs/heads/master Commit: 96a7cfdd691c35ca5c8d66de60103fa799aec9eb Parents: dfbf7cc Author: lburgazzoli <lburgazz...@gmail.com> Authored: Fri Sep 22 15:02:13 2017 +0200 Committer: lburgazzoli <lburgazz...@gmail.com> Committed: Mon Sep 25 14:20:59 2017 +0200 ---------------------------------------------------------------------- .../apache/camel/ha/CamelClusterService.java | 25 ++- .../camel/ha/CamelClusterServiceHelper.java | 41 +++++ .../org/apache/camel/ha/CamelClusterView.java | 1 + .../impl/ha/AbstractCamelClusterService.java | 92 ++++++++--- .../impl/health/DefaultHealthCheckService.java | 2 +- .../camel/util/concurrent/LockHelper.java | 8 +- .../camel/impl/ha/CamelClusterServiceTest.java | 155 +++++++++++++++++++ .../camel/impl/ha/CamelClusterViewTest.java | 128 --------------- .../component/atomix/ha/AtomixClusterView.java | 12 +- .../AtomixClientClusteredRoutePolicyMain.java | 66 ++++++++ .../src/test/resources/log4j2.properties | 3 +- .../component/consul/ha/ConsulClusterView.java | 74 +++++---- .../ha/ConsulClusteredRoutePolicyMain.java | 61 ++++++++ .../src/test/resources/log4j2.properties | 8 +- .../zookeeper/ha/ZooKeeperClusterView.java | 5 +- .../ha/ZooKeeperClusteredRoutePolicyMain.java | 63 ++++++++ 16 files changed, 555 insertions(+), 189 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/96a7cfdd/camel-core/src/main/java/org/apache/camel/ha/CamelClusterService.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/ha/CamelClusterService.java b/camel-core/src/main/java/org/apache/camel/ha/CamelClusterService.java index addc86d..6e99afb 100644 --- a/camel-core/src/main/java/org/apache/camel/ha/CamelClusterService.java +++ b/camel-core/src/main/java/org/apache/camel/ha/CamelClusterService.java @@ -16,6 +16,8 @@ */ package org.apache.camel.ha; +import java.util.Collection; + import org.apache.camel.CamelContextAware; import org.apache.camel.Service; import org.apache.camel.spi.IdAware; @@ -23,10 +25,10 @@ import org.apache.camel.spi.IdAware; public interface CamelClusterService extends Service, CamelContextAware, IdAware { /** - * Get a view of the cluster bound to a namespace creating it if needed. - * - * Multiple calls to this method with the same namespace should return the - * same instance. + * Get a view of the cluster bound to a namespace creating it if needed. Multiple + * calls to this method with the same namespace should return the same instance. + * The instance is automatically started the first time it is instantiated and + * if the cluster service is ready. * * @param namespace the namespace the view refer to. * @return the view. @@ -43,6 +45,21 @@ public interface CamelClusterService extends Service, CamelContextAware, IdAware void releaseView(CamelClusterView view) throws Exception; /** + * Return the namespaces handled by this service. + */ + Collection<String> getNamespaces(); + + /** + * Force start of the view associated to the give namespace. + */ + void startView(String namespace) throws Exception; + + /** + * Force stop of the view associated to the give namespace. + */ + void stopView(String namespace) throws Exception; + + /** * Access the underlying concrete CamelClusterService implementation to * provide access to further features. * http://git-wip-us.apache.org/repos/asf/camel/blob/96a7cfdd/camel-core/src/main/java/org/apache/camel/ha/CamelClusterServiceHelper.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/ha/CamelClusterServiceHelper.java b/camel-core/src/main/java/org/apache/camel/ha/CamelClusterServiceHelper.java new file mode 100644 index 0000000..96cc292 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/ha/CamelClusterServiceHelper.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.ha; + +import java.util.Optional; +import java.util.function.Predicate; + +import org.apache.camel.CamelContext; + +public final class CamelClusterServiceHelper { + private CamelClusterServiceHelper() { + } + + public static Optional<CamelClusterService> lookupService(CamelContext context) { + return Optional.ofNullable(context.hasService(CamelClusterService.class)); + } + + public static Optional<CamelClusterService> lookupService(CamelContext context, Predicate<CamelClusterService> selector) { + for (CamelClusterService service: context.hasServices(CamelClusterService.class)) { + if (selector.test(service)) { + return Optional.of(service); + } + } + + return Optional.empty(); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/96a7cfdd/camel-core/src/main/java/org/apache/camel/ha/CamelClusterView.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/ha/CamelClusterView.java b/camel-core/src/main/java/org/apache/camel/ha/CamelClusterView.java index a1eb12e..5398d44 100644 --- a/camel-core/src/main/java/org/apache/camel/ha/CamelClusterView.java +++ b/camel-core/src/main/java/org/apache/camel/ha/CamelClusterView.java @@ -16,6 +16,7 @@ */ package org.apache.camel.ha; +import java.util.Collection; import java.util.List; import java.util.Optional; http://git-wip-us.apache.org/repos/asf/camel/blob/96a7cfdd/camel-core/src/main/java/org/apache/camel/impl/ha/AbstractCamelClusterService.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/ha/AbstractCamelClusterService.java b/camel-core/src/main/java/org/apache/camel/impl/ha/AbstractCamelClusterService.java index 7f98d37..35175db 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/ha/AbstractCamelClusterService.java +++ b/camel-core/src/main/java/org/apache/camel/impl/ha/AbstractCamelClusterService.java @@ -16,7 +16,9 @@ */ package org.apache.camel.impl.ha; +import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.concurrent.locks.StampedLock; @@ -141,6 +143,52 @@ public abstract class AbstractCamelClusterService<T extends CamelClusterView> ex ); } + @Override + public Collection<String> getNamespaces() { + return LockHelper.supplyWithReadLock( + lock, + () -> { + // copy the key set so it is not modifiable and thread safe + // thus a little inefficient. + return new HashSet<>(views.keySet()); + } + ); + } + + @Override + public void startView(String namespace) throws Exception { + LockHelper.doWithWriteLockT( + lock, + () -> { + ViewHolder<T> holder = views.get(namespace); + + if (holder != null) { + LOGGER.info("Force start of view {}", namespace); + holder.startView(); + } else { + LOGGER.warn("Error forcing start of view {}: it does not exist", namespace); + } + } + ); + } + + @Override + public void stopView(String namespace) throws Exception { + LockHelper.doWithWriteLockT( + lock, + () -> { + ViewHolder<T> holder = views.get(namespace); + + if (holder != null) { + LOGGER.info("Force stop of view {}", namespace); + holder.stopView(); + } else { + LOGGER.warn("Error forcing stop of view {}: it does not exist", namespace); + } + } + ); + } + // ********************************** // Implementation // ********************************** @@ -157,14 +205,28 @@ public abstract class AbstractCamelClusterService<T extends CamelClusterView> ex ViewHolder(V view) { this.view = view; - this.count = ReferenceCount.on(this::startView, this::stopView); + this.count = ReferenceCount.on( + () -> { + try { + this.startView(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }, + () -> { + try { + this.stopView(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); } - public V get() { + V get() { return view; } - public V retain() { + V retain() { LOGGER.debug("Retain view {}, old-refs={}", view.getNamespace(), count.get()); count.retain(); @@ -172,30 +234,24 @@ public abstract class AbstractCamelClusterService<T extends CamelClusterView> ex return get(); } - public void release() { + void release() { LOGGER.debug("Release view {}, old-refs={}", view.getNamespace(), count.get()); count.release(); } - private void startView() { + void startView() throws Exception { if (AbstractCamelClusterService.this.isRunAllowed()) { - try { - LOGGER.debug("Start view {}", view.getNamespace()); - view.start(); - } catch (Exception e) { - throw new RuntimeException(e); - } + LOGGER.debug("Start view {}", view.getNamespace()); + view.start(); + } else { + LOGGER.debug("Can't start view {} as cluster service is not running, view will be started on service start-up", view.getNamespace()); } } - private void stopView() { - try { - LOGGER.debug("Stop view {}", view.getNamespace()); - view.stop(); - } catch (Exception e) { - throw new RuntimeException(e); - } + void stopView() throws Exception { + LOGGER.debug("Stop view {}", view.getNamespace()); + view.stop(); } } } http://git-wip-us.apache.org/repos/asf/camel/blob/96a7cfdd/camel-core/src/main/java/org/apache/camel/impl/health/DefaultHealthCheckService.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/health/DefaultHealthCheckService.java b/camel-core/src/main/java/org/apache/camel/impl/health/DefaultHealthCheckService.java index 8fbbd0c..df60d94 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/health/DefaultHealthCheckService.java +++ b/camel-core/src/main/java/org/apache/camel/impl/health/DefaultHealthCheckService.java @@ -250,7 +250,7 @@ public final class DefaultHealthCheckService extends ServiceSupport implements H } private HealthCheck.Result invoke(HealthCheck check, Map<String, Object> options) { - return LockHelper.suppliWithWriteLock( + return LockHelper.supplyWithWriteLock( lock, () -> { LOGGER.debug("Invoke health-check {}", check.getId()); http://git-wip-us.apache.org/repos/asf/camel/blob/96a7cfdd/camel-core/src/main/java/org/apache/camel/util/concurrent/LockHelper.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/util/concurrent/LockHelper.java b/camel-core/src/main/java/org/apache/camel/util/concurrent/LockHelper.java index 9acbb96..bdf48a4 100644 --- a/camel-core/src/main/java/org/apache/camel/util/concurrent/LockHelper.java +++ b/camel-core/src/main/java/org/apache/camel/util/concurrent/LockHelper.java @@ -57,7 +57,7 @@ public final class LockHelper { } } - public static <R> R suppliWithReadLock(StampedLock lock, Supplier<R> task) { + public static <R> R supplyWithReadLock(StampedLock lock, Supplier<R> task) { long stamp = lock.readLock(); try { @@ -67,7 +67,7 @@ public final class LockHelper { } } - public static <R, T extends Throwable> R suppliWithReadLockT(StampedLock lock, ThrowingSupplier<R, T> task) throws T { + public static <R, T extends Throwable> R supplyWithReadLockT(StampedLock lock, ThrowingSupplier<R, T> task) throws T { long stamp = lock.readLock(); try { @@ -97,7 +97,7 @@ public final class LockHelper { } } - public static <R> R suppliWithWriteLock(StampedLock lock, Supplier<R> task) { + public static <R> R supplyWithWriteLock(StampedLock lock, Supplier<R> task) { long stamp = lock.writeLock(); try { @@ -117,7 +117,7 @@ public final class LockHelper { } } - public static <R, T extends Throwable> R suppliWithWriteLockT(StampedLock lock, ThrowingSupplier<R, T> task) throws T { + public static <R, T extends Throwable> R supplyWithWriteLockT(StampedLock lock, ThrowingSupplier<R, T> task) throws T { long stamp = lock.writeLock(); try { http://git-wip-us.apache.org/repos/asf/camel/blob/96a7cfdd/camel-core/src/test/java/org/apache/camel/impl/ha/CamelClusterServiceTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/impl/ha/CamelClusterServiceTest.java b/camel-core/src/test/java/org/apache/camel/impl/ha/CamelClusterServiceTest.java new file mode 100644 index 0000000..df45ff0 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/impl/ha/CamelClusterServiceTest.java @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl.ha; + +import java.util.List; +import java.util.Optional; + +import org.apache.camel.ServiceStatus; +import org.apache.camel.ha.CamelClusterMember; +import org.apache.camel.ha.CamelClusterService; +import org.junit.Assert; +import org.junit.Test; + +public class CamelClusterServiceTest { + + @Test + public void testViewEquality() throws Exception { + TestClusterService service = new TestClusterService(); + TestClusterView view1 = service.getView("ns1").unwrap(TestClusterView.class); + TestClusterView view2 = service.getView("ns1").unwrap(TestClusterView.class); + TestClusterView view3 = service.getView("ns2").unwrap(TestClusterView.class); + + Assert.assertEquals(view1, view2); + Assert.assertNotEquals(view1, view3); + } + + @Test + public void testViewReferences() throws Exception { + TestClusterService service = new TestClusterService(); + service.start(); + + TestClusterView view1 = service.getView("ns1").unwrap(TestClusterView.class); + TestClusterView view2 = service.getView("ns1").unwrap(TestClusterView.class); + TestClusterView view3 = service.getView("ns2").unwrap(TestClusterView.class); + + Assert.assertEquals(ServiceStatus.Started, view1.getStatus()); + Assert.assertEquals(ServiceStatus.Started, view2.getStatus()); + Assert.assertEquals(ServiceStatus.Started, view3.getStatus()); + + service.releaseView(view1); + + Assert.assertEquals(ServiceStatus.Started, view1.getStatus()); + Assert.assertEquals(ServiceStatus.Started, view2.getStatus()); + Assert.assertEquals(ServiceStatus.Started, view3.getStatus()); + + service.releaseView(view2); + + Assert.assertEquals(ServiceStatus.Stopped, view1.getStatus()); + Assert.assertEquals(ServiceStatus.Stopped, view2.getStatus()); + Assert.assertEquals(ServiceStatus.Started, view3.getStatus()); + + service.releaseView(view3); + + TestClusterView newView1 = service.getView("ns1").unwrap(TestClusterView.class); + TestClusterView newView2 = service.getView("ns1").unwrap(TestClusterView.class); + + Assert.assertEquals(newView1, newView2); + Assert.assertEquals(view1, newView1); + Assert.assertEquals(view1, newView2); + + Assert.assertEquals(ServiceStatus.Started, newView1.getStatus()); + Assert.assertEquals(ServiceStatus.Started, newView2.getStatus()); + Assert.assertEquals(ServiceStatus.Stopped, view3.getStatus()); + + service.stop(); + + Assert.assertEquals(ServiceStatus.Stopped, view1.getStatus()); + Assert.assertEquals(ServiceStatus.Stopped, view2.getStatus()); + Assert.assertEquals(ServiceStatus.Stopped, view3.getStatus()); + Assert.assertEquals(ServiceStatus.Stopped, newView1.getStatus()); + Assert.assertEquals(ServiceStatus.Stopped, newView2.getStatus()); + } + + @Test + public void testViewForceOperations() throws Exception { + TestClusterService service = new TestClusterService(); + TestClusterView view = service.getView("ns1").unwrap(TestClusterView.class); + + Assert.assertEquals(ServiceStatus.Stopped, view.getStatus()); + + // This should not start the view as the service has not yet started. + service.startView(view.getNamespace()); + + Assert.assertEquals(ServiceStatus.Stopped, view.getStatus()); + + // This should start the view. + service.start(); + + Assert.assertEquals(ServiceStatus.Started, view.getStatus()); + + service.stopView(view.getNamespace()); + Assert.assertEquals(ServiceStatus.Stopped, view.getStatus()); + + service.startView(view.getNamespace()); + Assert.assertEquals(ServiceStatus.Started, view.getStatus()); + + service.releaseView(view); + Assert.assertEquals(ServiceStatus.Stopped, view.getStatus()); + } + + // ********************************* + // Helpers + // ********************************* + + private static class TestClusterView extends AbstractCamelClusterView { + + public TestClusterView(CamelClusterService cluster, String namespace) { + super(cluster, namespace); + } + + @Override + public Optional<CamelClusterMember> getMaster() { + return null; + } + + @Override + public CamelClusterMember getLocalMember() { + return null; + } + + @Override + public List<CamelClusterMember> getMembers() { + return null; + } + + @Override + protected void doStart() throws Exception { + } + + @Override + protected void doStop() throws Exception { + } + } + + private static class TestClusterService extends AbstractCamelClusterService<TestClusterView> { + @Override + protected TestClusterView createView(String namespace) throws Exception { + return new TestClusterView(this, namespace); + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/96a7cfdd/camel-core/src/test/java/org/apache/camel/impl/ha/CamelClusterViewTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/impl/ha/CamelClusterViewTest.java b/camel-core/src/test/java/org/apache/camel/impl/ha/CamelClusterViewTest.java deleted file mode 100644 index 06cce29..0000000 --- a/camel-core/src/test/java/org/apache/camel/impl/ha/CamelClusterViewTest.java +++ /dev/null @@ -1,128 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.impl.ha; - -import java.util.List; -import java.util.Optional; - -import org.apache.camel.ServiceStatus; -import org.apache.camel.ha.CamelClusterMember; -import org.apache.camel.ha.CamelClusterService; -import org.junit.Assert; -import org.junit.Test; - -public class CamelClusterViewTest { - - @Test - public void testEquality() throws Exception { - TestClusterService service = new TestClusterService(); - TestClusterView view1 = service.getView("ns1").unwrap(TestClusterView.class); - TestClusterView view2 = service.getView("ns1").unwrap(TestClusterView.class); - TestClusterView view3 = service.getView("ns2").unwrap(TestClusterView.class); - - Assert.assertEquals(view1, view2); - Assert.assertNotEquals(view1, view3); - } - - @Test - public void testReferences() throws Exception { - TestClusterService service = new TestClusterService(); - service.start(); - - TestClusterView view1 = service.getView("ns1").unwrap(TestClusterView.class); - TestClusterView view2 = service.getView("ns1").unwrap(TestClusterView.class); - TestClusterView view3 = service.getView("ns2").unwrap(TestClusterView.class); - - Assert.assertEquals(ServiceStatus.Started, view1.getStatus()); - Assert.assertEquals(ServiceStatus.Started, view2.getStatus()); - Assert.assertEquals(ServiceStatus.Started, view3.getStatus()); - - service.releaseView(view1); - - Assert.assertEquals(ServiceStatus.Started, view1.getStatus()); - Assert.assertEquals(ServiceStatus.Started, view2.getStatus()); - Assert.assertEquals(ServiceStatus.Started, view3.getStatus()); - - service.releaseView(view2); - - Assert.assertEquals(ServiceStatus.Stopped, view1.getStatus()); - Assert.assertEquals(ServiceStatus.Stopped, view2.getStatus()); - Assert.assertEquals(ServiceStatus.Started, view3.getStatus()); - - service.releaseView(view3); - - TestClusterView newView1 = service.getView("ns1").unwrap(TestClusterView.class); - TestClusterView newView2 = service.getView("ns1").unwrap(TestClusterView.class); - - Assert.assertEquals(newView1, newView2); - Assert.assertEquals(view1, newView1); - Assert.assertEquals(view1, newView2); - - Assert.assertEquals(ServiceStatus.Started, newView1.getStatus()); - Assert.assertEquals(ServiceStatus.Started, newView2.getStatus()); - Assert.assertEquals(ServiceStatus.Stopped, view3.getStatus()); - - service.stop(); - - Assert.assertEquals(ServiceStatus.Stopped, view1.getStatus()); - Assert.assertEquals(ServiceStatus.Stopped, view2.getStatus()); - Assert.assertEquals(ServiceStatus.Stopped, view3.getStatus()); - Assert.assertEquals(ServiceStatus.Stopped, newView1.getStatus()); - Assert.assertEquals(ServiceStatus.Stopped, newView2.getStatus()); - } - - // ********************************* - // Helpers - // ********************************* - - private static class TestClusterView extends AbstractCamelClusterView { - - public TestClusterView(CamelClusterService cluster, String namespace) { - super(cluster, namespace); - } - - @Override - public Optional<CamelClusterMember> getMaster() { - return null; - } - - @Override - public CamelClusterMember getLocalMember() { - return null; - } - - @Override - public List<CamelClusterMember> getMembers() { - return null; - } - - @Override - protected void doStart() throws Exception { - } - - @Override - protected void doStop() throws Exception { - } - } - - private static class TestClusterService extends AbstractCamelClusterService<TestClusterView> { - @Override - protected TestClusterView createView(String namespace) throws Exception { - return new TestClusterView(this, namespace); - } - } -} http://git-wip-us.apache.org/repos/asf/camel/blob/96a7cfdd/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterView.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterView.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterView.java index 4be80a1..e8c1dcb 100644 --- a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterView.java +++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterView.java @@ -175,8 +175,16 @@ final class AtomixClusterView extends AbstractCamelClusterView { AtomixLocalMember leave() { if (member != null) { - LOGGER.debug("Leaving group {}", group); - member.leave(); + String id = member.id(); + + LOGGER.debug("Member {} : leave group {}", id, group); + + member.leave().join(); + group.remove(id).join(); + + member = null; + + fireLeadershipChangedEvent(null); } return this; http://git-wip-us.apache.org/repos/asf/camel/blob/96a7cfdd/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientClusteredRoutePolicyMain.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientClusteredRoutePolicyMain.java b/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientClusteredRoutePolicyMain.java new file mode 100644 index 0000000..ca9944c --- /dev/null +++ b/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientClusteredRoutePolicyMain.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atomix.ha; + +import java.util.UUID; + +import io.atomix.catalyst.transport.Address; +import io.atomix.copycat.server.storage.StorageLevel; +import org.apache.camel.CamelContext; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.impl.ExplicitCamelContextNameStrategy; +import org.apache.camel.impl.ha.ClusteredRoutePolicyFactory; +import org.apache.camel.main.Main; +import org.apache.camel.main.MainListenerSupport; + +public class AtomixClientClusteredRoutePolicyMain { + + public static void main(String[] args) throws Exception { + final String id = UUID.randomUUID().toString(); + + Main main = new Main(); + main.addMainListener(new MainListenerSupport() { + @Override + public void configure(CamelContext context) { + try { + AtomixClusterService service = new AtomixClusterService(); + service.setId("node-" + id); + service.setStorageLevel(StorageLevel.MEMORY); + service.setAddress(new Address(args[0])); + service.setNodes(args.length > 1 ? args[1] : args[0]); + + context.setNameStrategy(new ExplicitCamelContextNameStrategy("camel-" + id)); + context.addService(service); + context.addRoutePolicyFactory(ClusteredRoutePolicyFactory.forNamespace("my-ns")); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }); + + main.addRouteBuilder(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("timer:clustered?delay=1s&period=1s") + .routeId("route-" + id) + .log("Route ${routeId} is running ..."); + } + }); + + main.run(); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/96a7cfdd/components/camel-atomix/src/test/resources/log4j2.properties ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/test/resources/log4j2.properties b/components/camel-atomix/src/test/resources/log4j2.properties index d70cd9d..d424d7e 100644 --- a/components/camel-atomix/src/test/resources/log4j2.properties +++ b/components/camel-atomix/src/test/resources/log4j2.properties @@ -26,7 +26,7 @@ appender.out.layout.type = PatternLayout appender.out.layout.pattern = [%30.30t] %-30.30c{1} %-5p %m%n logger.atomix.name = io.atomix -logger.atomix.level = WARN +logger.atomix.level = INFO logger.camel.name = org.apache.camel logger.camel.level = INFO @@ -38,4 +38,5 @@ logger.camel-impl-ha.name = org.apache.camel.impl.ha logger.camel-impl-ha.level = DEBUG rootLogger.level = INFO +#rootLogger.appenderRef.stdout.ref = out rootLogger.appenderRef.file.ref = file http://git-wip-us.apache.org/repos/asf/camel/blob/96a7cfdd/components/camel-consul/src/main/java/org/apache/camel/component/consul/ha/ConsulClusterView.java ---------------------------------------------------------------------- diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/ha/ConsulClusterView.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/ha/ConsulClusterView.java index 4ec8be2..dfa85d2 100644 --- a/components/camel-consul/src/main/java/org/apache/camel/component/consul/ha/ConsulClusterView.java +++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/ha/ConsulClusterView.java @@ -44,12 +44,12 @@ final class ConsulClusterView extends AbstractCamelClusterView { private final ConsulClusterConfiguration configuration; private final ConsulLocalMember localMember; private final ConsulClusterMember nullMember; + private final AtomicReference<String> sessionId; private final Watcher watcher; private Consul client; private SessionClient sessionClient; private KeyValueClient keyValueClient; - private String sessionId; private String path; ConsulClusterView(ConsulClusterService service, ConsulClusterConfiguration configuration, String namespace) throws Exception { @@ -58,6 +58,7 @@ final class ConsulClusterView extends AbstractCamelClusterView { this.configuration = configuration; this.localMember = new ConsulLocalMember(); this.nullMember = new ConsulClusterMember(); + this.sessionId = new AtomicReference<>(); this.watcher = new Watcher(); this.path = configuration.getRootPath() + "/" + namespace; } @@ -94,22 +95,24 @@ final class ConsulClusterView extends AbstractCamelClusterView { @Override protected void doStart() throws Exception { - if (sessionId == null) { + if (sessionId.get() == null) { client = configuration.createConsulClient(getCamelContext()); sessionClient = client.sessionClient(); keyValueClient = client.keyValueClient(); - sessionId = sessionClient.createSession( - ImmutableSession.builder() - .name(getNamespace()) - .ttl(configuration.getSessionTtl() + "s") - .lockDelay(configuration.getSessionLockDelay() + "s") - .build() - ).getId(); + sessionId.set( + sessionClient.createSession( + ImmutableSession.builder() + .name(getNamespace()) + .ttl(configuration.getSessionTtl() + "s") + .lockDelay(configuration.getSessionLockDelay() + "s") + .build() + ).getId() + ); - LOGGER.debug("Acquired session with id '{}'", sessionId); - boolean lock = keyValueClient.acquireLock(this.path, sessionId); - LOGGER.debug("Acquire lock on path '{}' with id '{}' result '{}'", path, sessionId, lock); + LOGGER.debug("Acquired session with id '{}'", sessionId.get()); + boolean lock = acquireLock(); + LOGGER.debug("Acquire lock on path '{}' with id '{}' result '{}'", path, sessionId.get(), lock); localMember.setMaster(lock); watcher.watch(); @@ -118,16 +121,24 @@ final class ConsulClusterView extends AbstractCamelClusterView { @Override protected void doStop() throws Exception { - if (sessionId != null) { - if (keyValueClient.releaseLock(this.path, sessionId)) { - LOGGER.debug("Successfully released lock on path '{}' with id '{}'", path, sessionId); + if (sessionId.get() != null) { + if (keyValueClient.releaseLock(this.path, sessionId.get())) { + LOGGER.debug("Successfully released lock on path '{}' with id '{}'", path, sessionId.get()); } - sessionClient.destroySession(sessionId); + sessionClient.destroySession(sessionId.getAndSet(null)); localMember.setMaster(false); } } + private boolean acquireLock() { + String sid = sessionId.get(); + + return (sid != null) + ? keyValueClient.acquireLock(this.path, sid) + : false; + } + // *********************************************** // // *********************************************** @@ -137,12 +148,12 @@ final class ConsulClusterView extends AbstractCamelClusterView { void setMaster(boolean master) { if (master && this.master.compareAndSet(false, true)) { - LOGGER.debug("Leadership taken for session id {}", sessionId); + LOGGER.debug("Leadership taken for session id {}", sessionId.get()); fireLeadershipChangedEvent(this); return; } if (!master && this.master.compareAndSet(true, false)) { - LOGGER.debug("Leadership lost for session id {}", sessionId); + LOGGER.debug("Leadership lost for session id {}", sessionId.get()); fireLeadershipChangedEvent(getMaster().orElse(nullMember)); return; } @@ -155,7 +166,7 @@ final class ConsulClusterView extends AbstractCamelClusterView { @Override public String getId() { - return sessionId; + return sessionId.get(); } @Override @@ -219,19 +230,24 @@ final class ConsulClusterView extends AbstractCamelClusterView { @Override public void onComplete(ConsulResponse<com.google.common.base.Optional<Value>> consulResponse) { - if (isRunAllowed()) { + if (isStarting() || isStarted()) { com.google.common.base.Optional<Value> value = consulResponse.getResponse(); if (value.isPresent()) { com.google.common.base.Optional<String> sid = value.get().getSession(); if (!sid.isPresent()) { // If the key is not held by any session, try acquire a // lock (become leader) - boolean lock = keyValueClient.acquireLock(configuration.getRootPath(), sessionId); - LOGGER.debug("Try to acquire lock on path '{}' with id '{}', result '{}'", path, sessionId, lock); + boolean lock = acquireLock(); + LOGGER.debug("Try to acquire lock on path '{}' with id '{}', result '{}'", path, sessionId.get(), lock); localMember.setMaster(lock); } else { - localMember.setMaster(sessionId.equals(sid.get())); + boolean master = sid.get().equals(sessionId.get()); + if (!master) { + LOGGER.debug("Path {} is held by session {}, local session is {}", path, sid.get(), sessionId.get()); + } + + localMember.setMaster(sid.get().equals(sessionId.get())); } } @@ -244,8 +260,8 @@ final class ConsulClusterView extends AbstractCamelClusterView { public void onFailure(Throwable throwable) { LOGGER.debug("", throwable); - if (sessionId != null) { - keyValueClient.releaseLock(configuration.getRootPath(), sessionId); + if (sessionId.get() != null) { + keyValueClient.releaseLock(configuration.getRootPath(), sessionId.get()); } localMember.setMaster(false); @@ -253,7 +269,11 @@ final class ConsulClusterView extends AbstractCamelClusterView { } public void watch() { - if (isRunAllowed()) { + if (sessionId.get() == null) { + return; + } + + if (isStarting() || isStarted()) { // Watch for changes keyValueClient.getValue( path, @@ -262,7 +282,7 @@ final class ConsulClusterView extends AbstractCamelClusterView { ); // Refresh session - sessionClient.renewSession(sessionId); + sessionClient.renewSession(sessionId.get()); } } } http://git-wip-us.apache.org/repos/asf/camel/blob/96a7cfdd/components/camel-consul/src/test/java/org/apache/camel/component/consul/ha/ConsulClusteredRoutePolicyMain.java ---------------------------------------------------------------------- diff --git a/components/camel-consul/src/test/java/org/apache/camel/component/consul/ha/ConsulClusteredRoutePolicyMain.java b/components/camel-consul/src/test/java/org/apache/camel/component/consul/ha/ConsulClusteredRoutePolicyMain.java new file mode 100644 index 0000000..b629899 --- /dev/null +++ b/components/camel-consul/src/test/java/org/apache/camel/component/consul/ha/ConsulClusteredRoutePolicyMain.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.consul.ha; + +import java.util.UUID; + +import org.apache.camel.CamelContext; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.impl.ExplicitCamelContextNameStrategy; +import org.apache.camel.impl.ha.ClusteredRoutePolicyFactory; +import org.apache.camel.main.Main; +import org.apache.camel.main.MainListenerSupport; + +public class ConsulClusteredRoutePolicyMain { + public static void main(String[] args) throws Exception { + final String id = UUID.randomUUID().toString(); + + Main main = new Main(); + main.addMainListener(new MainListenerSupport() { + @Override + public void configure(CamelContext context) { + try { + ConsulClusterService service = new ConsulClusterService(); + service.setId("node-" + id); + service.setUrl(args[0]); + + context.setNameStrategy(new ExplicitCamelContextNameStrategy("camel-" + id)); + context.addService(service); + context.addRoutePolicyFactory(ClusteredRoutePolicyFactory.forNamespace("my-ns")); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }); + + main.addRouteBuilder(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("timer:clustered?delay=1s&period=1s") + .routeId("route-" + id) + .log("Route ${routeId} is running ..."); + } + }); + + main.run(); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/96a7cfdd/components/camel-consul/src/test/resources/log4j2.properties ---------------------------------------------------------------------- diff --git a/components/camel-consul/src/test/resources/log4j2.properties b/components/camel-consul/src/test/resources/log4j2.properties index a1bcd10..c2510bc 100644 --- a/components/camel-consul/src/test/resources/log4j2.properties +++ b/components/camel-consul/src/test/resources/log4j2.properties @@ -26,10 +26,12 @@ appender.out.layout.type = PatternLayout appender.out.layout.pattern = [%30.30t] %-30.30c{1} %-5p %m%n logger.consul.name = org.apache.camel.component.consul logger.consul.level = DEBUG -rootLogger.level = INFO -rootLogger.appenderRef.out.ref = file logger.camel-ha.name = org.apache.camel.ha logger.camel-ha.level = DEBUG logger.camel-impl-ha.name = org.apache.camel.impl.ha -logger.camel-impl-ha.level = DEBUG \ No newline at end of file +logger.camel-impl-ha.level = DEBUG + +rootLogger.level = INFO +#rootLogger.appenderRef.stdout.ref = out +rootLogger.appenderRef.out.ref = file \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/96a7cfdd/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusterView.java ---------------------------------------------------------------------- diff --git a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusterView.java b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusterView.java index acb1ad8..fd3ab73 100644 --- a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusterView.java +++ b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusterView.java @@ -99,6 +99,9 @@ final class ZooKeeperClusterView extends AbstractCamelClusterView { leaderSelector.interruptLeadership(); leaderSelector.close(); leaderSelector = null; + + localMember.setMaster(false); + fireLeadershipChangedEvent(getMaster().orElse(null)); } } @@ -118,7 +121,7 @@ final class ZooKeeperClusterView extends AbstractCamelClusterView { } localMember.setMaster(false); - getMaster().ifPresent(leader -> fireLeadershipChangedEvent(leader)); + fireLeadershipChangedEvent(getMaster().orElse(null)); } } http://git-wip-us.apache.org/repos/asf/camel/blob/96a7cfdd/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusteredRoutePolicyMain.java ---------------------------------------------------------------------- diff --git a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusteredRoutePolicyMain.java b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusteredRoutePolicyMain.java new file mode 100644 index 0000000..6be67f2 --- /dev/null +++ b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusteredRoutePolicyMain.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.zookeeper.ha; + +import java.util.UUID; + +import org.apache.camel.CamelContext; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.impl.ExplicitCamelContextNameStrategy; +import org.apache.camel.impl.ha.ClusteredRoutePolicyFactory; +import org.apache.camel.main.Main; +import org.apache.camel.main.MainListenerSupport; + +public final class ZooKeeperClusteredRoutePolicyMain { + + public static void main(String[] args) throws Exception { + final String id = UUID.randomUUID().toString(); + + Main main = new Main(); + main.addMainListener(new MainListenerSupport() { + @Override + public void configure(CamelContext context) { + try { + ZooKeeperClusterService service = new ZooKeeperClusterService(); + service.setId("node-" + id); + service.setNodes(args[0]); + service.setBasePath("/camel"); + + context.setNameStrategy(new ExplicitCamelContextNameStrategy("camel-" + id)); + context.addService(service); + context.addRoutePolicyFactory(ClusteredRoutePolicyFactory.forNamespace("my-ns")); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }); + + main.addRouteBuilder(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("timer:clustered?delay=1s&period=1s") + .routeId("route-" + id) + .log("Route ${routeId} is running ..."); + } + }); + + main.run(); + } +}