Repository: camel
Updated Branches:
  refs/heads/master 0c90ebdc4 -> ea4a722bc


CAMEL-11800: cluster service : there should be an option to leave a cluster view


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/96a7cfdd
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/96a7cfdd
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/96a7cfdd

Branch: refs/heads/master
Commit: 96a7cfdd691c35ca5c8d66de60103fa799aec9eb
Parents: dfbf7cc
Author: lburgazzoli <lburgazz...@gmail.com>
Authored: Fri Sep 22 15:02:13 2017 +0200
Committer: lburgazzoli <lburgazz...@gmail.com>
Committed: Mon Sep 25 14:20:59 2017 +0200

----------------------------------------------------------------------
 .../apache/camel/ha/CamelClusterService.java    |  25 ++-
 .../camel/ha/CamelClusterServiceHelper.java     |  41 +++++
 .../org/apache/camel/ha/CamelClusterView.java   |   1 +
 .../impl/ha/AbstractCamelClusterService.java    |  92 ++++++++---
 .../impl/health/DefaultHealthCheckService.java  |   2 +-
 .../camel/util/concurrent/LockHelper.java       |   8 +-
 .../camel/impl/ha/CamelClusterServiceTest.java  | 155 +++++++++++++++++++
 .../camel/impl/ha/CamelClusterViewTest.java     | 128 ---------------
 .../component/atomix/ha/AtomixClusterView.java  |  12 +-
 .../AtomixClientClusteredRoutePolicyMain.java   |  66 ++++++++
 .../src/test/resources/log4j2.properties        |   3 +-
 .../component/consul/ha/ConsulClusterView.java  |  74 +++++----
 .../ha/ConsulClusteredRoutePolicyMain.java      |  61 ++++++++
 .../src/test/resources/log4j2.properties        |   8 +-
 .../zookeeper/ha/ZooKeeperClusterView.java      |   5 +-
 .../ha/ZooKeeperClusteredRoutePolicyMain.java   |  63 ++++++++
 16 files changed, 555 insertions(+), 189 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/96a7cfdd/camel-core/src/main/java/org/apache/camel/ha/CamelClusterService.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/ha/CamelClusterService.java 
b/camel-core/src/main/java/org/apache/camel/ha/CamelClusterService.java
index addc86d..6e99afb 100644
--- a/camel-core/src/main/java/org/apache/camel/ha/CamelClusterService.java
+++ b/camel-core/src/main/java/org/apache/camel/ha/CamelClusterService.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.ha;
 
+import java.util.Collection;
+
 import org.apache.camel.CamelContextAware;
 import org.apache.camel.Service;
 import org.apache.camel.spi.IdAware;
@@ -23,10 +25,10 @@ import org.apache.camel.spi.IdAware;
 public interface CamelClusterService extends Service, CamelContextAware, 
IdAware {
 
     /**
-     * Get a view of the cluster bound to a namespace creating it if needed.
-     *
-     * Multiple calls to this method with the same namespace should return the
-     * same instance.
+     * Get a view of the cluster bound to a namespace creating it if needed. 
Multiple
+     * calls to this method with the same namespace should return the same 
instance.
+     * The instance is automatically started the first time it is instantiated 
and
+     * if the cluster service is ready.
      *
      * @param namespace the namespace the view refer to.
      * @return the view.
@@ -43,6 +45,21 @@ public interface CamelClusterService extends Service, 
CamelContextAware, IdAware
     void releaseView(CamelClusterView view) throws Exception;
 
     /**
+     * Return the namespaces handled by this service.
+     */
+    Collection<String> getNamespaces();
+
+    /**
+     * Force start of the view associated to the give namespace.
+     */
+    void startView(String namespace) throws Exception;
+
+    /**
+     * Force stop of the view associated to the give namespace.
+     */
+    void stopView(String namespace) throws Exception;
+
+    /**
      * Access the underlying concrete CamelClusterService implementation to
      * provide access to further features.
      *

http://git-wip-us.apache.org/repos/asf/camel/blob/96a7cfdd/camel-core/src/main/java/org/apache/camel/ha/CamelClusterServiceHelper.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/ha/CamelClusterServiceHelper.java 
b/camel-core/src/main/java/org/apache/camel/ha/CamelClusterServiceHelper.java
new file mode 100644
index 0000000..96cc292
--- /dev/null
+++ 
b/camel-core/src/main/java/org/apache/camel/ha/CamelClusterServiceHelper.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.ha;
+
+import java.util.Optional;
+import java.util.function.Predicate;
+
+import org.apache.camel.CamelContext;
+
+public final class CamelClusterServiceHelper {
+    private CamelClusterServiceHelper() {
+    }
+
+    public static Optional<CamelClusterService> lookupService(CamelContext 
context) {
+        return 
Optional.ofNullable(context.hasService(CamelClusterService.class));
+    }
+
+    public static Optional<CamelClusterService> lookupService(CamelContext 
context, Predicate<CamelClusterService> selector) {
+        for (CamelClusterService service: 
context.hasServices(CamelClusterService.class)) {
+            if (selector.test(service)) {
+                return Optional.of(service);
+            }
+        }
+
+        return Optional.empty();
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/96a7cfdd/camel-core/src/main/java/org/apache/camel/ha/CamelClusterView.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/ha/CamelClusterView.java 
b/camel-core/src/main/java/org/apache/camel/ha/CamelClusterView.java
index a1eb12e..5398d44 100644
--- a/camel-core/src/main/java/org/apache/camel/ha/CamelClusterView.java
+++ b/camel-core/src/main/java/org/apache/camel/ha/CamelClusterView.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.ha;
 
+import java.util.Collection;
 import java.util.List;
 import java.util.Optional;
 

http://git-wip-us.apache.org/repos/asf/camel/blob/96a7cfdd/camel-core/src/main/java/org/apache/camel/impl/ha/AbstractCamelClusterService.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/impl/ha/AbstractCamelClusterService.java
 
b/camel-core/src/main/java/org/apache/camel/impl/ha/AbstractCamelClusterService.java
index 7f98d37..35175db 100644
--- 
a/camel-core/src/main/java/org/apache/camel/impl/ha/AbstractCamelClusterService.java
+++ 
b/camel-core/src/main/java/org/apache/camel/impl/ha/AbstractCamelClusterService.java
@@ -16,7 +16,9 @@
  */
 package org.apache.camel.impl.ha;
 
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.concurrent.locks.StampedLock;
 
@@ -141,6 +143,52 @@ public abstract class AbstractCamelClusterService<T 
extends CamelClusterView> ex
         );
     }
 
+    @Override
+    public Collection<String> getNamespaces() {
+        return LockHelper.supplyWithReadLock(
+            lock,
+            () -> {
+                // copy the key set so it is not modifiable and thread safe
+                // thus a little inefficient.
+                return new HashSet<>(views.keySet());
+            }
+        );
+    }
+
+    @Override
+    public void startView(String namespace) throws Exception {
+        LockHelper.doWithWriteLockT(
+            lock,
+            () -> {
+                ViewHolder<T> holder = views.get(namespace);
+
+                if (holder != null) {
+                    LOGGER.info("Force start of view {}", namespace);
+                    holder.startView();
+                } else {
+                    LOGGER.warn("Error forcing start of view {}: it does not 
exist", namespace);
+                }
+            }
+        );
+    }
+
+    @Override
+    public void stopView(String namespace) throws Exception {
+        LockHelper.doWithWriteLockT(
+            lock,
+            () -> {
+                ViewHolder<T> holder = views.get(namespace);
+
+                if (holder != null) {
+                    LOGGER.info("Force stop of view {}", namespace);
+                    holder.stopView();
+                } else {
+                    LOGGER.warn("Error forcing stop of view {}: it does not 
exist", namespace);
+                }
+            }
+        );
+    }
+
     // **********************************
     // Implementation
     // **********************************
@@ -157,14 +205,28 @@ public abstract class AbstractCamelClusterService<T 
extends CamelClusterView> ex
 
         ViewHolder(V view) {
             this.view = view;
-            this.count = ReferenceCount.on(this::startView, this::stopView);
+            this.count = ReferenceCount.on(
+                () -> {
+                    try {
+                        this.startView();
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
+                },
+                () -> {
+                    try {
+                        this.stopView();
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
+                });
         }
 
-        public V get() {
+        V get() {
             return view;
         }
 
-        public V retain() {
+        V retain() {
             LOGGER.debug("Retain view {}, old-refs={}", view.getNamespace(), 
count.get());
 
             count.retain();
@@ -172,30 +234,24 @@ public abstract class AbstractCamelClusterService<T 
extends CamelClusterView> ex
             return get();
         }
 
-        public void release() {
+        void release() {
             LOGGER.debug("Release view {}, old-refs={}", view.getNamespace(), 
count.get());
 
             count.release();
         }
 
-        private void startView() {
+        void startView() throws Exception {
             if (AbstractCamelClusterService.this.isRunAllowed()) {
-                try {
-                    LOGGER.debug("Start view {}", view.getNamespace());
-                    view.start();
-                } catch (Exception e) {
-                    throw new RuntimeException(e);
-                }
+                LOGGER.debug("Start view {}", view.getNamespace());
+                view.start();
+            } else {
+                LOGGER.debug("Can't start view {} as cluster service is not 
running, view will be started on service start-up", view.getNamespace());
             }
         }
 
-        private void stopView() {
-            try {
-                LOGGER.debug("Stop view {}", view.getNamespace());
-                view.stop();
-            } catch (Exception e) {
-                throw new RuntimeException(e);
-            }
+        void stopView() throws Exception {
+            LOGGER.debug("Stop view {}", view.getNamespace());
+            view.stop();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/96a7cfdd/camel-core/src/main/java/org/apache/camel/impl/health/DefaultHealthCheckService.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/impl/health/DefaultHealthCheckService.java
 
b/camel-core/src/main/java/org/apache/camel/impl/health/DefaultHealthCheckService.java
index 8fbbd0c..df60d94 100644
--- 
a/camel-core/src/main/java/org/apache/camel/impl/health/DefaultHealthCheckService.java
+++ 
b/camel-core/src/main/java/org/apache/camel/impl/health/DefaultHealthCheckService.java
@@ -250,7 +250,7 @@ public final class DefaultHealthCheckService extends 
ServiceSupport implements H
     }
 
     private HealthCheck.Result invoke(HealthCheck check, Map<String, Object> 
options) {
-        return LockHelper.suppliWithWriteLock(
+        return LockHelper.supplyWithWriteLock(
             lock,
             () -> {
                 LOGGER.debug("Invoke health-check {}", check.getId());

http://git-wip-us.apache.org/repos/asf/camel/blob/96a7cfdd/camel-core/src/main/java/org/apache/camel/util/concurrent/LockHelper.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/util/concurrent/LockHelper.java 
b/camel-core/src/main/java/org/apache/camel/util/concurrent/LockHelper.java
index 9acbb96..bdf48a4 100644
--- a/camel-core/src/main/java/org/apache/camel/util/concurrent/LockHelper.java
+++ b/camel-core/src/main/java/org/apache/camel/util/concurrent/LockHelper.java
@@ -57,7 +57,7 @@ public final class LockHelper {
         }
     }
 
-    public static <R> R suppliWithReadLock(StampedLock lock, Supplier<R> task) 
 {
+    public static <R> R supplyWithReadLock(StampedLock lock, Supplier<R> task) 
 {
         long stamp = lock.readLock();
 
         try {
@@ -67,7 +67,7 @@ public final class LockHelper {
         }
     }
 
-    public static <R, T extends Throwable> R suppliWithReadLockT(StampedLock 
lock, ThrowingSupplier<R, T> task) throws T {
+    public static <R, T extends Throwable> R supplyWithReadLockT(StampedLock 
lock, ThrowingSupplier<R, T> task) throws T {
         long stamp = lock.readLock();
 
         try {
@@ -97,7 +97,7 @@ public final class LockHelper {
         }
     }
 
-    public static <R> R suppliWithWriteLock(StampedLock lock, Supplier<R> 
task)  {
+    public static <R> R supplyWithWriteLock(StampedLock lock, Supplier<R> 
task)  {
         long stamp = lock.writeLock();
 
         try {
@@ -117,7 +117,7 @@ public final class LockHelper {
         }
     }
 
-    public static <R, T extends Throwable> R suppliWithWriteLockT(StampedLock 
lock, ThrowingSupplier<R, T> task) throws T {
+    public static <R, T extends Throwable> R supplyWithWriteLockT(StampedLock 
lock, ThrowingSupplier<R, T> task) throws T {
         long stamp = lock.writeLock();
 
         try {

http://git-wip-us.apache.org/repos/asf/camel/blob/96a7cfdd/camel-core/src/test/java/org/apache/camel/impl/ha/CamelClusterServiceTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/impl/ha/CamelClusterServiceTest.java
 
b/camel-core/src/test/java/org/apache/camel/impl/ha/CamelClusterServiceTest.java
new file mode 100644
index 0000000..df45ff0
--- /dev/null
+++ 
b/camel-core/src/test/java/org/apache/camel/impl/ha/CamelClusterServiceTest.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl.ha;
+
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.camel.ServiceStatus;
+import org.apache.camel.ha.CamelClusterMember;
+import org.apache.camel.ha.CamelClusterService;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class CamelClusterServiceTest {
+
+    @Test
+    public void testViewEquality() throws Exception {
+        TestClusterService service = new TestClusterService();
+        TestClusterView view1 = 
service.getView("ns1").unwrap(TestClusterView.class);
+        TestClusterView view2 = 
service.getView("ns1").unwrap(TestClusterView.class);
+        TestClusterView view3 = 
service.getView("ns2").unwrap(TestClusterView.class);
+
+        Assert.assertEquals(view1, view2);
+        Assert.assertNotEquals(view1, view3);
+    }
+
+    @Test
+    public void testViewReferences() throws Exception {
+        TestClusterService service = new TestClusterService();
+        service.start();
+
+        TestClusterView view1 = 
service.getView("ns1").unwrap(TestClusterView.class);
+        TestClusterView view2 = 
service.getView("ns1").unwrap(TestClusterView.class);
+        TestClusterView view3 = 
service.getView("ns2").unwrap(TestClusterView.class);
+
+        Assert.assertEquals(ServiceStatus.Started, view1.getStatus());
+        Assert.assertEquals(ServiceStatus.Started, view2.getStatus());
+        Assert.assertEquals(ServiceStatus.Started, view3.getStatus());
+
+        service.releaseView(view1);
+
+        Assert.assertEquals(ServiceStatus.Started, view1.getStatus());
+        Assert.assertEquals(ServiceStatus.Started, view2.getStatus());
+        Assert.assertEquals(ServiceStatus.Started, view3.getStatus());
+
+        service.releaseView(view2);
+
+        Assert.assertEquals(ServiceStatus.Stopped, view1.getStatus());
+        Assert.assertEquals(ServiceStatus.Stopped, view2.getStatus());
+        Assert.assertEquals(ServiceStatus.Started, view3.getStatus());
+
+        service.releaseView(view3);
+
+        TestClusterView newView1 = 
service.getView("ns1").unwrap(TestClusterView.class);
+        TestClusterView newView2 = 
service.getView("ns1").unwrap(TestClusterView.class);
+
+        Assert.assertEquals(newView1, newView2);
+        Assert.assertEquals(view1, newView1);
+        Assert.assertEquals(view1, newView2);
+
+        Assert.assertEquals(ServiceStatus.Started, newView1.getStatus());
+        Assert.assertEquals(ServiceStatus.Started, newView2.getStatus());
+        Assert.assertEquals(ServiceStatus.Stopped, view3.getStatus());
+
+        service.stop();
+
+        Assert.assertEquals(ServiceStatus.Stopped, view1.getStatus());
+        Assert.assertEquals(ServiceStatus.Stopped, view2.getStatus());
+        Assert.assertEquals(ServiceStatus.Stopped, view3.getStatus());
+        Assert.assertEquals(ServiceStatus.Stopped, newView1.getStatus());
+        Assert.assertEquals(ServiceStatus.Stopped, newView2.getStatus());
+    }
+
+    @Test
+    public void testViewForceOperations() throws Exception {
+        TestClusterService service = new TestClusterService();
+        TestClusterView view = 
service.getView("ns1").unwrap(TestClusterView.class);
+
+        Assert.assertEquals(ServiceStatus.Stopped, view.getStatus());
+
+        // This should not start the view as the service has not yet started.
+        service.startView(view.getNamespace());
+
+        Assert.assertEquals(ServiceStatus.Stopped, view.getStatus());
+
+        // This should start the view.
+        service.start();
+
+        Assert.assertEquals(ServiceStatus.Started, view.getStatus());
+
+        service.stopView(view.getNamespace());
+        Assert.assertEquals(ServiceStatus.Stopped, view.getStatus());
+
+        service.startView(view.getNamespace());
+        Assert.assertEquals(ServiceStatus.Started, view.getStatus());
+
+        service.releaseView(view);
+        Assert.assertEquals(ServiceStatus.Stopped, view.getStatus());
+    }
+
+    // *********************************
+    // Helpers
+    // *********************************
+
+    private static class TestClusterView extends AbstractCamelClusterView {
+
+        public TestClusterView(CamelClusterService cluster, String namespace) {
+            super(cluster, namespace);
+        }
+
+        @Override
+        public Optional<CamelClusterMember> getMaster() {
+            return null;
+        }
+
+        @Override
+        public CamelClusterMember getLocalMember() {
+            return null;
+        }
+
+        @Override
+        public List<CamelClusterMember> getMembers() {
+            return null;
+        }
+
+        @Override
+        protected void doStart() throws Exception {
+        }
+
+        @Override
+        protected void doStop() throws Exception {
+        }
+    }
+
+    private static class TestClusterService extends 
AbstractCamelClusterService<TestClusterView> {
+        @Override
+        protected TestClusterView createView(String namespace) throws 
Exception {
+            return new TestClusterView(this, namespace);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/96a7cfdd/camel-core/src/test/java/org/apache/camel/impl/ha/CamelClusterViewTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/impl/ha/CamelClusterViewTest.java 
b/camel-core/src/test/java/org/apache/camel/impl/ha/CamelClusterViewTest.java
deleted file mode 100644
index 06cce29..0000000
--- 
a/camel-core/src/test/java/org/apache/camel/impl/ha/CamelClusterViewTest.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.impl.ha;
-
-import java.util.List;
-import java.util.Optional;
-
-import org.apache.camel.ServiceStatus;
-import org.apache.camel.ha.CamelClusterMember;
-import org.apache.camel.ha.CamelClusterService;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class CamelClusterViewTest {
-
-    @Test
-    public void testEquality() throws Exception {
-        TestClusterService service = new TestClusterService();
-        TestClusterView view1 = 
service.getView("ns1").unwrap(TestClusterView.class);
-        TestClusterView view2 = 
service.getView("ns1").unwrap(TestClusterView.class);
-        TestClusterView view3 = 
service.getView("ns2").unwrap(TestClusterView.class);
-
-        Assert.assertEquals(view1, view2);
-        Assert.assertNotEquals(view1, view3);
-    }
-
-    @Test
-    public void testReferences() throws Exception {
-        TestClusterService service = new TestClusterService();
-        service.start();
-
-        TestClusterView view1 = 
service.getView("ns1").unwrap(TestClusterView.class);
-        TestClusterView view2 = 
service.getView("ns1").unwrap(TestClusterView.class);
-        TestClusterView view3 = 
service.getView("ns2").unwrap(TestClusterView.class);
-
-        Assert.assertEquals(ServiceStatus.Started, view1.getStatus());
-        Assert.assertEquals(ServiceStatus.Started, view2.getStatus());
-        Assert.assertEquals(ServiceStatus.Started, view3.getStatus());
-
-        service.releaseView(view1);
-
-        Assert.assertEquals(ServiceStatus.Started, view1.getStatus());
-        Assert.assertEquals(ServiceStatus.Started, view2.getStatus());
-        Assert.assertEquals(ServiceStatus.Started, view3.getStatus());
-
-        service.releaseView(view2);
-
-        Assert.assertEquals(ServiceStatus.Stopped, view1.getStatus());
-        Assert.assertEquals(ServiceStatus.Stopped, view2.getStatus());
-        Assert.assertEquals(ServiceStatus.Started, view3.getStatus());
-
-        service.releaseView(view3);
-
-        TestClusterView newView1 = 
service.getView("ns1").unwrap(TestClusterView.class);
-        TestClusterView newView2 = 
service.getView("ns1").unwrap(TestClusterView.class);
-
-        Assert.assertEquals(newView1, newView2);
-        Assert.assertEquals(view1, newView1);
-        Assert.assertEquals(view1, newView2);
-
-        Assert.assertEquals(ServiceStatus.Started, newView1.getStatus());
-        Assert.assertEquals(ServiceStatus.Started, newView2.getStatus());
-        Assert.assertEquals(ServiceStatus.Stopped, view3.getStatus());
-
-        service.stop();
-
-        Assert.assertEquals(ServiceStatus.Stopped, view1.getStatus());
-        Assert.assertEquals(ServiceStatus.Stopped, view2.getStatus());
-        Assert.assertEquals(ServiceStatus.Stopped, view3.getStatus());
-        Assert.assertEquals(ServiceStatus.Stopped, newView1.getStatus());
-        Assert.assertEquals(ServiceStatus.Stopped, newView2.getStatus());
-    }
-
-    // *********************************
-    // Helpers
-    // *********************************
-
-    private static class TestClusterView extends AbstractCamelClusterView {
-
-        public TestClusterView(CamelClusterService cluster, String namespace) {
-            super(cluster, namespace);
-        }
-
-        @Override
-        public Optional<CamelClusterMember> getMaster() {
-            return null;
-        }
-
-        @Override
-        public CamelClusterMember getLocalMember() {
-            return null;
-        }
-
-        @Override
-        public List<CamelClusterMember> getMembers() {
-            return null;
-        }
-
-        @Override
-        protected void doStart() throws Exception {
-        }
-
-        @Override
-        protected void doStop() throws Exception {
-        }
-    }
-
-    private static class TestClusterService extends 
AbstractCamelClusterService<TestClusterView> {
-        @Override
-        protected TestClusterView createView(String namespace) throws 
Exception {
-            return new TestClusterView(this, namespace);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/96a7cfdd/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterView.java
----------------------------------------------------------------------
diff --git 
a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterView.java
 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterView.java
index 4be80a1..e8c1dcb 100644
--- 
a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterView.java
+++ 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterView.java
@@ -175,8 +175,16 @@ final class AtomixClusterView extends 
AbstractCamelClusterView {
 
         AtomixLocalMember leave() {
             if (member != null) {
-                LOGGER.debug("Leaving group {}", group);
-                member.leave();
+                String id = member.id();
+
+                LOGGER.debug("Member {} : leave group {}", id, group);
+
+                member.leave().join();
+                group.remove(id).join();
+
+                member = null;
+
+                fireLeadershipChangedEvent(null);
             }
 
             return this;

http://git-wip-us.apache.org/repos/asf/camel/blob/96a7cfdd/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientClusteredRoutePolicyMain.java
----------------------------------------------------------------------
diff --git 
a/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientClusteredRoutePolicyMain.java
 
b/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientClusteredRoutePolicyMain.java
new file mode 100644
index 0000000..ca9944c
--- /dev/null
+++ 
b/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientClusteredRoutePolicyMain.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.atomix.ha;
+
+import java.util.UUID;
+
+import io.atomix.catalyst.transport.Address;
+import io.atomix.copycat.server.storage.StorageLevel;
+import org.apache.camel.CamelContext;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.ExplicitCamelContextNameStrategy;
+import org.apache.camel.impl.ha.ClusteredRoutePolicyFactory;
+import org.apache.camel.main.Main;
+import org.apache.camel.main.MainListenerSupport;
+
+public class AtomixClientClusteredRoutePolicyMain {
+
+    public static void main(String[] args) throws Exception {
+        final String id = UUID.randomUUID().toString();
+
+        Main main = new Main();
+        main.addMainListener(new MainListenerSupport() {
+            @Override
+            public void configure(CamelContext context) {
+                try {
+                    AtomixClusterService service = new AtomixClusterService();
+                    service.setId("node-" + id);
+                    service.setStorageLevel(StorageLevel.MEMORY);
+                    service.setAddress(new Address(args[0]));
+                    service.setNodes(args.length > 1 ? args[1] : args[0]);
+
+                    context.setNameStrategy(new 
ExplicitCamelContextNameStrategy("camel-" + id));
+                    context.addService(service);
+                    
context.addRoutePolicyFactory(ClusteredRoutePolicyFactory.forNamespace("my-ns"));
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        });
+
+        main.addRouteBuilder(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("timer:clustered?delay=1s&period=1s")
+                    .routeId("route-" + id)
+                    .log("Route ${routeId} is running ...");
+            }
+        });
+
+        main.run();
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/96a7cfdd/components/camel-atomix/src/test/resources/log4j2.properties
----------------------------------------------------------------------
diff --git a/components/camel-atomix/src/test/resources/log4j2.properties 
b/components/camel-atomix/src/test/resources/log4j2.properties
index d70cd9d..d424d7e 100644
--- a/components/camel-atomix/src/test/resources/log4j2.properties
+++ b/components/camel-atomix/src/test/resources/log4j2.properties
@@ -26,7 +26,7 @@ appender.out.layout.type = PatternLayout
 appender.out.layout.pattern = [%30.30t] %-30.30c{1} %-5p %m%n
 
 logger.atomix.name = io.atomix
-logger.atomix.level = WARN
+logger.atomix.level = INFO
 
 logger.camel.name = org.apache.camel
 logger.camel.level = INFO
@@ -38,4 +38,5 @@ logger.camel-impl-ha.name = org.apache.camel.impl.ha
 logger.camel-impl-ha.level = DEBUG
 
 rootLogger.level = INFO
+#rootLogger.appenderRef.stdout.ref = out
 rootLogger.appenderRef.file.ref = file

http://git-wip-us.apache.org/repos/asf/camel/blob/96a7cfdd/components/camel-consul/src/main/java/org/apache/camel/component/consul/ha/ConsulClusterView.java
----------------------------------------------------------------------
diff --git 
a/components/camel-consul/src/main/java/org/apache/camel/component/consul/ha/ConsulClusterView.java
 
b/components/camel-consul/src/main/java/org/apache/camel/component/consul/ha/ConsulClusterView.java
index 4ec8be2..dfa85d2 100644
--- 
a/components/camel-consul/src/main/java/org/apache/camel/component/consul/ha/ConsulClusterView.java
+++ 
b/components/camel-consul/src/main/java/org/apache/camel/component/consul/ha/ConsulClusterView.java
@@ -44,12 +44,12 @@ final class ConsulClusterView extends 
AbstractCamelClusterView {
     private final ConsulClusterConfiguration configuration;
     private final ConsulLocalMember localMember;
     private final ConsulClusterMember nullMember;
+    private final AtomicReference<String> sessionId;
     private final Watcher watcher;
 
     private Consul client;
     private SessionClient sessionClient;
     private KeyValueClient keyValueClient;
-    private String sessionId;
     private String path;
 
     ConsulClusterView(ConsulClusterService service, ConsulClusterConfiguration 
configuration, String namespace) throws Exception {
@@ -58,6 +58,7 @@ final class ConsulClusterView extends 
AbstractCamelClusterView {
         this.configuration = configuration;
         this.localMember = new ConsulLocalMember();
         this.nullMember = new ConsulClusterMember();
+        this.sessionId = new AtomicReference<>();
         this.watcher = new Watcher();
         this.path = configuration.getRootPath() + "/" + namespace;
     }
@@ -94,22 +95,24 @@ final class ConsulClusterView extends 
AbstractCamelClusterView {
 
     @Override
     protected void doStart() throws Exception {
-        if (sessionId == null) {
+        if (sessionId.get() == null) {
             client = configuration.createConsulClient(getCamelContext());
             sessionClient = client.sessionClient();
             keyValueClient = client.keyValueClient();
 
-            sessionId = sessionClient.createSession(
-                ImmutableSession.builder()
-                    .name(getNamespace())
-                    .ttl(configuration.getSessionTtl() + "s")
-                    .lockDelay(configuration.getSessionLockDelay() + "s")
-                    .build()
-            ).getId();
+            sessionId.set(
+                sessionClient.createSession(
+                    ImmutableSession.builder()
+                        .name(getNamespace())
+                        .ttl(configuration.getSessionTtl() + "s")
+                        .lockDelay(configuration.getSessionLockDelay() + "s")
+                        .build()
+                ).getId()
+            );
 
-            LOGGER.debug("Acquired session with id '{}'", sessionId);
-            boolean lock = keyValueClient.acquireLock(this.path, sessionId);
-            LOGGER.debug("Acquire lock on path '{}' with id '{}' result '{}'", 
path, sessionId, lock);
+            LOGGER.debug("Acquired session with id '{}'", sessionId.get());
+            boolean lock = acquireLock();
+            LOGGER.debug("Acquire lock on path '{}' with id '{}' result '{}'", 
path, sessionId.get(), lock);
 
             localMember.setMaster(lock);
             watcher.watch();
@@ -118,16 +121,24 @@ final class ConsulClusterView extends 
AbstractCamelClusterView {
 
     @Override
     protected void doStop() throws Exception {
-        if (sessionId != null) {
-            if (keyValueClient.releaseLock(this.path, sessionId)) {
-                LOGGER.debug("Successfully released lock on path '{}' with id 
'{}'", path, sessionId);
+        if (sessionId.get() != null) {
+            if (keyValueClient.releaseLock(this.path, sessionId.get())) {
+                LOGGER.debug("Successfully released lock on path '{}' with id 
'{}'", path, sessionId.get());
             }
 
-            sessionClient.destroySession(sessionId);
+            sessionClient.destroySession(sessionId.getAndSet(null));
             localMember.setMaster(false);
         }
     }
 
+    private boolean acquireLock() {
+        String sid = sessionId.get();
+
+        return (sid != null)
+            ? keyValueClient.acquireLock(this.path, sid)
+            : false;
+    }
+
     // ***********************************************
     //
     // ***********************************************
@@ -137,12 +148,12 @@ final class ConsulClusterView extends 
AbstractCamelClusterView {
 
         void setMaster(boolean master) {
             if (master && this.master.compareAndSet(false, true)) {
-                LOGGER.debug("Leadership taken for session id {}", sessionId);
+                LOGGER.debug("Leadership taken for session id {}", 
sessionId.get());
                 fireLeadershipChangedEvent(this);
                 return;
             }
             if (!master && this.master.compareAndSet(true, false)) {
-                LOGGER.debug("Leadership lost for session id {}", sessionId);
+                LOGGER.debug("Leadership lost for session id {}", 
sessionId.get());
                 fireLeadershipChangedEvent(getMaster().orElse(nullMember));
                 return;
             }
@@ -155,7 +166,7 @@ final class ConsulClusterView extends 
AbstractCamelClusterView {
 
         @Override
         public String getId() {
-            return sessionId;
+            return sessionId.get();
         }
 
         @Override
@@ -219,19 +230,24 @@ final class ConsulClusterView extends 
AbstractCamelClusterView {
 
         @Override
         public void 
onComplete(ConsulResponse<com.google.common.base.Optional<Value>> 
consulResponse) {
-            if (isRunAllowed()) {
+            if (isStarting() || isStarted()) {
                 com.google.common.base.Optional<Value> value = 
consulResponse.getResponse();
                 if (value.isPresent()) {
                     com.google.common.base.Optional<String> sid = 
value.get().getSession();
                     if (!sid.isPresent()) {
                         // If the key is not held by any session, try acquire a
                         // lock (become leader)
-                        boolean lock = 
keyValueClient.acquireLock(configuration.getRootPath(), sessionId);
-                        LOGGER.debug("Try to acquire lock on path '{}' with id 
'{}', result '{}'", path, sessionId, lock);
+                        boolean lock = acquireLock();
+                        LOGGER.debug("Try to acquire lock on path '{}' with id 
'{}', result '{}'", path, sessionId.get(), lock);
 
                         localMember.setMaster(lock);
                     } else {
-                        localMember.setMaster(sessionId.equals(sid.get()));
+                        boolean master = sid.get().equals(sessionId.get());
+                        if (!master) {
+                            LOGGER.debug("Path {} is held by session {}, local 
session is {}", path, sid.get(), sessionId.get());
+                        }
+
+                        
localMember.setMaster(sid.get().equals(sessionId.get()));
                     }
                 }
 
@@ -244,8 +260,8 @@ final class ConsulClusterView extends 
AbstractCamelClusterView {
         public void onFailure(Throwable throwable) {
             LOGGER.debug("", throwable);
 
-            if (sessionId != null) {
-                keyValueClient.releaseLock(configuration.getRootPath(), 
sessionId);
+            if (sessionId.get() != null) {
+                keyValueClient.releaseLock(configuration.getRootPath(), 
sessionId.get());
             }
 
             localMember.setMaster(false);
@@ -253,7 +269,11 @@ final class ConsulClusterView extends 
AbstractCamelClusterView {
         }
 
         public void watch() {
-            if (isRunAllowed()) {
+            if (sessionId.get() == null) {
+                return;
+            }
+
+            if (isStarting() || isStarted()) {
                 // Watch for changes
                 keyValueClient.getValue(
                     path,
@@ -262,7 +282,7 @@ final class ConsulClusterView extends 
AbstractCamelClusterView {
                 );
 
                 // Refresh session
-                sessionClient.renewSession(sessionId);
+                sessionClient.renewSession(sessionId.get());
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/96a7cfdd/components/camel-consul/src/test/java/org/apache/camel/component/consul/ha/ConsulClusteredRoutePolicyMain.java
----------------------------------------------------------------------
diff --git 
a/components/camel-consul/src/test/java/org/apache/camel/component/consul/ha/ConsulClusteredRoutePolicyMain.java
 
b/components/camel-consul/src/test/java/org/apache/camel/component/consul/ha/ConsulClusteredRoutePolicyMain.java
new file mode 100644
index 0000000..b629899
--- /dev/null
+++ 
b/components/camel-consul/src/test/java/org/apache/camel/component/consul/ha/ConsulClusteredRoutePolicyMain.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.consul.ha;
+
+import java.util.UUID;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.ExplicitCamelContextNameStrategy;
+import org.apache.camel.impl.ha.ClusteredRoutePolicyFactory;
+import org.apache.camel.main.Main;
+import org.apache.camel.main.MainListenerSupport;
+
+public class ConsulClusteredRoutePolicyMain {
+    public static void main(String[] args) throws Exception {
+        final String id = UUID.randomUUID().toString();
+
+        Main main = new Main();
+        main.addMainListener(new MainListenerSupport() {
+            @Override
+            public void configure(CamelContext context) {
+                try {
+                    ConsulClusterService service = new ConsulClusterService();
+                    service.setId("node-" + id);
+                    service.setUrl(args[0]);
+
+                    context.setNameStrategy(new 
ExplicitCamelContextNameStrategy("camel-" + id));
+                    context.addService(service);
+                    
context.addRoutePolicyFactory(ClusteredRoutePolicyFactory.forNamespace("my-ns"));
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        });
+
+        main.addRouteBuilder(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("timer:clustered?delay=1s&period=1s")
+                    .routeId("route-" + id)
+                    .log("Route ${routeId} is running ...");
+            }
+        });
+
+        main.run();
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/96a7cfdd/components/camel-consul/src/test/resources/log4j2.properties
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/test/resources/log4j2.properties 
b/components/camel-consul/src/test/resources/log4j2.properties
index a1bcd10..c2510bc 100644
--- a/components/camel-consul/src/test/resources/log4j2.properties
+++ b/components/camel-consul/src/test/resources/log4j2.properties
@@ -26,10 +26,12 @@ appender.out.layout.type = PatternLayout
 appender.out.layout.pattern = [%30.30t] %-30.30c{1} %-5p %m%n
 logger.consul.name = org.apache.camel.component.consul
 logger.consul.level = DEBUG
-rootLogger.level = INFO
-rootLogger.appenderRef.out.ref = file
 
 logger.camel-ha.name = org.apache.camel.ha
 logger.camel-ha.level = DEBUG
 logger.camel-impl-ha.name = org.apache.camel.impl.ha
-logger.camel-impl-ha.level = DEBUG
\ No newline at end of file
+logger.camel-impl-ha.level = DEBUG
+
+rootLogger.level = INFO
+#rootLogger.appenderRef.stdout.ref = out
+rootLogger.appenderRef.out.ref = file
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/96a7cfdd/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusterView.java
----------------------------------------------------------------------
diff --git 
a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusterView.java
 
b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusterView.java
index acb1ad8..fd3ab73 100644
--- 
a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusterView.java
+++ 
b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusterView.java
@@ -99,6 +99,9 @@ final class ZooKeeperClusterView extends 
AbstractCamelClusterView {
             leaderSelector.interruptLeadership();
             leaderSelector.close();
             leaderSelector = null;
+
+            localMember.setMaster(false);
+            fireLeadershipChangedEvent(getMaster().orElse(null));
         }
     }
 
@@ -118,7 +121,7 @@ final class ZooKeeperClusterView extends 
AbstractCamelClusterView {
             }
             
             localMember.setMaster(false);
-            getMaster().ifPresent(leader -> 
fireLeadershipChangedEvent(leader));
+            fireLeadershipChangedEvent(getMaster().orElse(null));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/96a7cfdd/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusteredRoutePolicyMain.java
----------------------------------------------------------------------
diff --git 
a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusteredRoutePolicyMain.java
 
b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusteredRoutePolicyMain.java
new file mode 100644
index 0000000..6be67f2
--- /dev/null
+++ 
b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusteredRoutePolicyMain.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeeper.ha;
+
+import java.util.UUID;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.ExplicitCamelContextNameStrategy;
+import org.apache.camel.impl.ha.ClusteredRoutePolicyFactory;
+import org.apache.camel.main.Main;
+import org.apache.camel.main.MainListenerSupport;
+
+public final class ZooKeeperClusteredRoutePolicyMain {
+
+    public static void main(String[] args) throws Exception {
+        final  String id = UUID.randomUUID().toString();
+
+        Main main = new Main();
+        main.addMainListener(new MainListenerSupport() {
+            @Override
+            public void configure(CamelContext context) {
+                try {
+                    ZooKeeperClusterService service = new 
ZooKeeperClusterService();
+                    service.setId("node-" + id);
+                    service.setNodes(args[0]);
+                    service.setBasePath("/camel");
+
+                    context.setNameStrategy(new 
ExplicitCamelContextNameStrategy("camel-" + id));
+                    context.addService(service);
+                    
context.addRoutePolicyFactory(ClusteredRoutePolicyFactory.forNamespace("my-ns"));
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        });
+
+        main.addRouteBuilder(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("timer:clustered?delay=1s&period=1s")
+                    .routeId("route-" + id)
+                    .log("Route ${routeId} is running ...");
+            }
+        });
+
+        main.run();
+    }
+}

Reply via email to