CAMEL-10320: Camel Master component for clustered services

Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1c8bca8e
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1c8bca8e
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1c8bca8e

Branch: refs/heads/master
Commit: 1c8bca8e842bc2fd85bd865f8c6968ddda8329d6
Parents: 14f2391
Author: lburgazzoli <lburgazz...@gmail.com>
Authored: Mon Sep 25 17:05:25 2017 +0200
Committer: lburgazzoli <lburgazz...@gmail.com>
Committed: Wed Sep 27 16:17:01 2017 +0200

----------------------------------------------------------------------
 bom/camel-bom/pom.xml                           |  10 ++
 components/camel-consul/pom.xml                 |   5 +
 .../component/consul/ha/ConsulMasterIT.java     | 109 +++++++++++++
 .../src/test/resources/log4j2.properties        |   2 +
 components/camel-master/pom.xml                 |   9 +-
 .../src/main/docs/master-component.adoc         | 122 ++++++++------
 .../camel/component/master/MasterComponent.java |  59 ++++++-
 .../camel/component/master/MasterConsumer.java  |  34 ++--
 .../camel/component/master/MasterEndpoint.java  |  21 ++-
 .../component/master/MasterComponentTest.java   |  40 +++--
 .../master/util/InMemoryClusterMember.java      |  49 ------
 .../master/util/InMemoryClusterService.java     |  47 ------
 .../master/util/InMemoryClusterView.java        | 159 -------------------
 .../src/test/resources/jgroups-tcp.xml          |  53 -------
 .../src/test/resources/log4j2.properties        |   5 +-
 ...ooKeeperClusteredRouteConfigurationTest.java |  26 +++
 .../zookeeper/ha/ZooKeeperMasterMain.java       |  12 +-
 .../zookeeper/ha/ZooKeeperMasterTest.java       |  20 +--
 ...ZooKeeperClusteredRouteConfigurationTest.xml |  42 +++++
 .../MasterComponentConfiguration.java           |  33 +++-
 .../camel-spring-boot-dependencies/pom.xml      |  10 ++
 21 files changed, 443 insertions(+), 424 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/1c8bca8e/bom/camel-bom/pom.xml
----------------------------------------------------------------------
diff --git a/bom/camel-bom/pom.xml b/bom/camel-bom/pom.xml
index 4e9a433..175fd27 100644
--- a/bom/camel-bom/pom.xml
+++ b/bom/camel-bom/pom.xml
@@ -1576,6 +1576,16 @@
       </dependency>
       <dependency>
         <groupId>org.apache.camel</groupId>
+        <artifactId>camel-master</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.camel</groupId>
+        <artifactId>camel-master-starter</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.camel</groupId>
         <artifactId>camel-metrics</artifactId>
         <version>${project.version}</version>
       </dependency>

http://git-wip-us.apache.org/repos/asf/camel/blob/1c8bca8e/components/camel-consul/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-consul/pom.xml b/components/camel-consul/pom.xml
index 8133b47..cbe3a26 100644
--- a/components/camel-consul/pom.xml
+++ b/components/camel-consul/pom.xml
@@ -71,6 +71,11 @@
     </dependency>
     <dependency>
       <groupId>org.apache.camel</groupId>
+      <artifactId>camel-master</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.camel</groupId>
       <artifactId>camel-ribbon</artifactId>
       <scope>test</scope>
     </dependency>

http://git-wip-us.apache.org/repos/asf/camel/blob/1c8bca8e/components/camel-consul/src/test/java/org/apache/camel/component/consul/ha/ConsulMasterIT.java
----------------------------------------------------------------------
diff --git 
a/components/camel-consul/src/test/java/org/apache/camel/component/consul/ha/ConsulMasterIT.java
 
b/components/camel-consul/src/test/java/org/apache/camel/component/consul/ha/ConsulMasterIT.java
new file mode 100644
index 0000000..5dbbb5c
--- /dev/null
+++ 
b/components/camel-consul/src/test/java/org/apache/camel/component/consul/ha/ConsulMasterIT.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.consul.ha;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import com.orbitz.consul.Consul;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ConsulMasterIT {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ConsulMasterIT.class);
+    private static final List<String> CLIENTS = IntStream.range(0, 
3).mapToObj(Integer::toString).collect(Collectors.toList());
+    private static final List<String> RESULTS = new ArrayList<>();
+    private static final ScheduledExecutorService SCHEDULER = 
Executors.newScheduledThreadPool(CLIENTS.size() * 2);
+    private static final CountDownLatch LATCH = new 
CountDownLatch(CLIENTS.size());
+    private static final String CONSUL_HOST = 
System.getProperty("camel.consul.host", Consul.DEFAULT_HTTP_HOST);
+    private static final int CONSUL_PORT = 
Integer.getInteger("camel.consul.port", Consul.DEFAULT_HTTP_PORT);
+
+    // ************************************
+    // Test
+    // ************************************
+
+    @Test
+    public void test() throws Exception {
+        for (String id : CLIENTS) {
+            SCHEDULER.submit(() -> run(id));
+        }
+
+        LATCH.await(1, TimeUnit.MINUTES);
+        SCHEDULER.shutdownNow();
+
+        Assert.assertEquals(CLIENTS.size(), RESULTS.size());
+        Assert.assertTrue(RESULTS.containsAll(CLIENTS));
+    }
+
+    // ************************************
+    // Run a Camel node
+    // ************************************
+
+    private static void run(String id) {
+        try {
+            int events = ThreadLocalRandom.current().nextInt(2, 6);
+            CountDownLatch contextLatch = new CountDownLatch(events);
+
+            ConsulClusterService service = new ConsulClusterService();
+            service.setId("node-" + id);
+            service.setUrl(String.format("http://%s:%d";, CONSUL_HOST, 
CONSUL_PORT));
+
+            LOGGER.info("Consul URL {}", service.getUrl());
+
+            DefaultCamelContext context = new DefaultCamelContext();
+            context.disableJMX();
+            context.setName("context-" + id);
+            context.addService(service);
+            context.addRoutes(new RouteBuilder() {
+                @Override
+                public void configure() throws Exception {
+                    from("master:my-ns:timer:consul?delay=1s&period=1s")
+                        .routeId("route-" + id)
+                        .log("From ${routeId}")
+                        .process(e -> contextLatch.countDown());
+                }
+            });
+
+            // Start the context after some random time so the startup order
+            // changes for each test.
+            Thread.sleep(ThreadLocalRandom.current().nextInt(500));
+            context.start();
+
+            contextLatch.await();
+
+            LOGGER.debug("Shutting down node {}", id);
+            RESULTS.add(id);
+
+            context.stop();
+
+            LATCH.countDown();
+        } catch (Exception e) {
+            LOGGER.warn("", e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/1c8bca8e/components/camel-consul/src/test/resources/log4j2.properties
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/test/resources/log4j2.properties 
b/components/camel-consul/src/test/resources/log4j2.properties
index a4da2bf..059cf25 100644
--- a/components/camel-consul/src/test/resources/log4j2.properties
+++ b/components/camel-consul/src/test/resources/log4j2.properties
@@ -40,6 +40,8 @@ logger.camel-ha.name = org.apache.camel.ha
 logger.camel-ha.level = DEBUG
 logger.camel-impl-ha.name = org.apache.camel.impl.ha
 logger.camel-impl-ha.level = DEBUG
+logger.camel-master.name = org.apache.camel.component.master
+logger.camel-master.level = DEBUG
 
 rootLogger.level = INFO
 #rootLogger.appenderRef.stdout.ref = out

http://git-wip-us.apache.org/repos/asf/camel/blob/1c8bca8e/components/camel-master/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-master/pom.xml b/components/camel-master/pom.xml
index 58390b4..f103c8e 100644
--- a/components/camel-master/pom.xml
+++ b/components/camel-master/pom.xml
@@ -34,8 +34,7 @@
 
   <properties>
     <camel.osgi.import>
-      !com.google.common.base;,
-      !org.apache.camel.component.master.group,
+      !org.apache.camel.component.master,
       *
     </camel.osgi.import>
     <camel.osgi.export.pkg>
@@ -57,12 +56,6 @@
       <artifactId>camel-test</artifactId>
       <scope>test</scope>
     </dependency>
-    <dependency>
-      <groupId>org.jgroups</groupId>
-      <artifactId>jgroups</artifactId>
-      <version>${jgroups-version}</version>
-      <scope>test</scope>
-    </dependency>
 
     <dependency>
       <groupId>org.apache.logging.log4j</groupId>

http://git-wip-us.apache.org/repos/asf/camel/blob/1c8bca8e/components/camel-master/src/main/docs/master-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-master/src/main/docs/master-component.adoc 
b/components/camel-master/src/main/docs/master-component.adoc
index bce8b37..a5969bd 100644
--- a/components/camel-master/src/main/docs/master-component.adoc
+++ b/components/camel-master/src/main/docs/master-component.adoc
@@ -2,7 +2,7 @@
 
 *Available as of Camel version 2.20*
 
-The **zookeeper-master:** endpoint provides a way to ensure only a single 
consumer in a cluster consumes from a given endpoint;
+The **camel-master:** endpoint provides a way to ensure only a single consumer 
in a cluster consumes from a given endpoint;
 with automatic failover if that JVM dies.
 
 This can be very useful if you need to consume from some legacy back end which 
either doesn't support concurrent
@@ -10,11 +10,11 @@ consumption or due to commercial or stability reasons you 
can only have a single
 
 ### Using the master endpoint
 
-Just prefix any camel endpoint with **zookeeper-master:someName:** where 
_someName_ is a logical name and is
+Just prefix any camel endpoint with **master:someName:** where _someName_ is a 
logical name and is
 used to acquire the master lock. e.g.
 
 ```
-from("zookeeper-master:cheese:jms:foo").to("activemq:wine");
+from("master:cheese:jms:foo").to("activemq:wine");
 ```
 The above simulates the [Exclusive 
Consumers](http://activemq.apache.org/exclusive-consumer.html) type feature in
 ActiveMQ; but on any third party JMS provider which maybe doesn't support 
exclusive consumers.
@@ -24,7 +24,7 @@ ActiveMQ; but on any third party JMS provider which maybe 
doesn't support exclus
 
 [source]
 ----
-zookeeper-master:name:endpoint[?options]
+master:namespace:endpoint[?options]
 ----
 
 Where endpoint is any Camel endpoint you want to run in master/slave mode.
@@ -33,7 +33,17 @@ Where endpoint is any Camel endpoint you want to run in 
master/slave mode.
 ### Options
 
 // component options: START
-The Master component has no options.
+The Master component supports 3 options which are listed below.
+
+
+
+[width="100%",cols="2,5,^1,2",options="header"]
+|===
+| Name | Description | Default | Type
+| *service* (advanced) | Inject the service to use. |  | CamelClusterService
+| *serviceSelector* (advanced) | Inject the service selector used to lookup 
the CamelClusterService to use. |  | Selector
+| *resolveProperty Placeholders* (advanced) | Whether the component should 
resolve property placeholders on itself when starting. Only properties which 
are of String type can use property placeholders. | true | boolean
+|===
 // component options: END
 
 // endpoint options: START
@@ -73,59 +83,77 @@ You can protect a clustered Camel application to only 
consume files from one act
 
 [source,java]
 ----
-    // the file endpoint we want to consume from
-    String url = "file:target/inbox?delete=true";
-
-    // use the zookeeper master component in the clustered group named myGroup
-    // to run a master/slave mode in the following Camel url
-    from("zookeeper-master:myGroup:" + url)
-        .log(name + " - Received file: ${file:name}")
-        .delay(delay)
-        .log(name + " - Done file:     ${file:name}")
-        .to("file:target/outbox");
+// the file endpoint we want to consume from
+String url = "file:target/inbox?delete=true";
+
+// use the camel master component in the clustered group named myGroup
+// to run a master/slave mode in the following Camel url
+from("master:myGroup:" + url)
+    .log(name + " - Received file: ${file:name}")
+    .delay(delay)
+    .log(name + " - Done file:     ${file:name}")
+    .to("file:target/outbox");
 ----
 
-ZooKeeper will by default connect to `localhost:2181`, but you can configure 
this on the component level.
+The master component leverages CamelClusterService you can configure using
 
+* *Java*
++
 [source,java]
 ----
-    MasterComponent master = new MasterComponent();
-    master.setZooKeeperUrl("myzookeeper:2181");
-----
-
-However you can also configure the url of the ZooKeeper ensemble using 
environment variables.
+ZooKeeperClusterService service = new ZooKeeperClusterService();
+service.setId("camel-node-1");
+service.setNodes("myzk:2181");
+service.setBasePath("/camel/ha");
 
-    export ZOOKEEPER_URL = "myzookeeper:2181"
-
-## Master RoutePolicy
+context.addService(service)
+----
 
-You can also use a `RoutePolicy` to control routes in master/slave mode.
+* *Xml (Spring/Blueprint)*
++
+[source,xml]
+----
+<beans xmlns="http://www.springframework.org/schema/beans";
+   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+   xsi:schemaLocation="
+     http://www.springframework.org/schema/beans
+     http://www.springframework.org/schema/beans/spring-beans.xsd
+     http://camel.apache.org/schema/spring
+     http://camel.apache.org/schema/spring/camel-spring.xsd";>
+
+
+  <bean id="ha" 
class="org.apache.camel.component.zookeeper.ha.ZooKeeperClusterService">
+    <property name="id" value="camel-node-1"/>
+    <property name="basePath" value="/camel/ha"/>
+    <property name="nodes" value="myzk:2181"/>
+  </bean>
+
+  <camelContext xmlns="http://camel.apache.org/schema/spring"; 
autoStartup="false">
+    ...
+  </camelContext>
+
+</beans>
+----
 
-When doing so you must configure the route policy with
+* *Spring boot*
++
+[source,properties]
+----
+camel.component.zookeeper.cluster.service.enabled   = true
+camel.component.zookeeper.cluster.service.id        = camel-node-1
+camel.component.zookeeper.cluster.service.base-path = /camel/ha
+camel.component.zookeeper.cluster.service.nodes     = myzk:2181
+----
 
-- url to zookeeper ensemble
-- name of cluster group
-- *important* and set the route to not auto startup
+### Implementations
 
-A little example
+Camel provide the following ClusterService implementations:
 
-[source,java]
-----
-    MasterRoutePolicy master = new MasterRoutePolicy();
-    master.setZooKeeperUrl("localhost:2181");
-    master.setGroupName("myGroup");
-
-    // its import to set the route to not auto startup
-    // as we let the route policy start/stop the routes when it becomes a 
master/slave etc
-    from("file:target/inbox?delete=true").noAutoStartup()
-        // use the zookeeper master route policy in the clustered group
-        // to run this route in master/slave mode
-        .routePolicy(master)
-        .log(name + " - Received file: ${file:name}")
-        .delay(delay)
-        .log(name + " - Done file:     ${file:name}")
-        .to("file:target/outbox");
-----
+- camel-atomix
+- camel-consul
+- camel-file
+- camel-kubernetes
+- camel-zookeeper
 
 ### See Also
 

http://git-wip-us.apache.org/repos/asf/camel/blob/1c8bca8e/components/camel-master/src/main/java/org/apache/camel/component/master/MasterComponent.java
----------------------------------------------------------------------
diff --git 
a/components/camel-master/src/main/java/org/apache/camel/component/master/MasterComponent.java
 
b/components/camel-master/src/main/java/org/apache/camel/component/master/MasterComponent.java
index 3c10d9d..6d2b3dc 100644
--- 
a/components/camel-master/src/main/java/org/apache/camel/component/master/MasterComponent.java
+++ 
b/components/camel-master/src/main/java/org/apache/camel/component/master/MasterComponent.java
@@ -20,7 +20,11 @@ import java.util.Map;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
+import org.apache.camel.ha.CamelClusterService;
 import org.apache.camel.impl.DefaultComponent;
+import org.apache.camel.impl.ha.ClusterServiceHelper;
+import org.apache.camel.impl.ha.ClusterServiceSelectors;
+import org.apache.camel.spi.Metadata;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.StringHelper;
 
@@ -31,6 +35,10 @@ import org.apache.camel.util.StringHelper;
  * a single consumer.
  */
 public class MasterComponent extends DefaultComponent {
+    @Metadata(label = "advanced")
+    private CamelClusterService service;
+    @Metadata(label = "advanced")
+    private CamelClusterService.Selector serviceSelector;
 
     public MasterComponent() {
         this(null);
@@ -38,6 +46,8 @@ public class MasterComponent extends DefaultComponent {
 
     public MasterComponent(CamelContext context) {
         super(context);
+
+        this.serviceSelector = ClusterServiceSelectors.DEFAULT_SELECTOR;
     }
 
     @Override
@@ -55,6 +65,53 @@ public class MasterComponent extends DefaultComponent {
             delegateUri = delegateUri + "?" + uri.substring(uri.indexOf('?') + 
1);
         }
 
-        return new MasterEndpoint(uri, this, namespace, delegateUri);
+        return new MasterEndpoint(
+            uri,
+            this,
+            getClusterService(),
+            namespace,
+            delegateUri
+        );
+    }
+
+    public CamelClusterService getService() {
+        return service;
+    }
+
+    /**
+     * Inject the service to use.
+     */
+    public void setService(CamelClusterService service) {
+        this.service = service;
+    }
+
+    public CamelClusterService.Selector getServiceSelector() {
+        return serviceSelector;
+    }
+
+    /**
+     *
+     * Inject the service selector used to lookup the {@link 
CamelClusterService} to use.
+     */
+    public void setServiceSelector(CamelClusterService.Selector 
serviceSelector) {
+        this.serviceSelector = serviceSelector;
+    }
+
+    // ********************************
+    // Helpers
+    // ********************************
+
+    private CamelClusterService getClusterService() throws Exception {
+        if (service == null) {
+            CamelContext context = getCamelContext();
+
+            ObjectHelper.notNull(context, "Camel Context");
+
+            service = ClusterServiceHelper.lookupService(context, 
serviceSelector).orElseThrow(
+                () -> new IllegalStateException("No cluster service found")
+            );
+        }
+
+        return service;
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/1c8bca8e/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java
 
b/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java
index b693464..999ac71 100644
--- 
a/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java
+++ 
b/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java
@@ -16,7 +16,8 @@
  */
 package org.apache.camel.component.master;
 
-import org.apache.camel.CamelContext;
+import java.util.Optional;
+
 import org.apache.camel.Consumer;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Processor;
@@ -36,19 +37,20 @@ import org.slf4j.LoggerFactory;
 
 @ManagedResource(description = "Managed Master Consumer")
 public class MasterConsumer extends DefaultConsumer {
-    private static final transient Logger LOGER = 
LoggerFactory.getLogger(MasterConsumer.class);
+    private static final transient Logger LOGGER = 
LoggerFactory.getLogger(MasterConsumer.class);
 
+    private final CamelClusterService clusterService;
     private final MasterEndpoint masterEndpoint;
     private final Endpoint delegatedEndpoint;
     private final Processor processor;
     private final CamelClusterEventListener.Leadership leadershipListener;
     private Consumer delegatedConsumer;
-    private CamelClusterService service;
-    private CamelClusterView view;
+    private volatile CamelClusterView view;
 
-    public MasterConsumer(MasterEndpoint masterEndpoint, Processor processor) {
+    public MasterConsumer(MasterEndpoint masterEndpoint, Processor processor, 
CamelClusterService clusterService) {
         super(masterEndpoint, processor);
 
+        this.clusterService = clusterService;
         this.masterEndpoint = masterEndpoint;
         this.delegatedEndpoint = masterEndpoint.getEndpoint();
         this.processor = processor;
@@ -59,14 +61,9 @@ public class MasterConsumer extends DefaultConsumer {
     protected void doStart() throws Exception {
         super.doStart();
 
-        CamelContext context = super.getEndpoint().getCamelContext();
-        service = context.hasService(CamelClusterService.class);
-
-        if (service == null) {
-            throw new IllegalStateException("No cluster service found");
-        }
+        LOGGER.debug("Using ClusterService instance {} (id={}, type={})", 
clusterService, clusterService.getId(), clusterService.getClass().getName());
 
-        view = service.getView(masterEndpoint.getNamespace());
+        view = clusterService.getView(masterEndpoint.getNamespace());
         view.addEventListener(leadershipListener);
 
         if (isMaster()) {
@@ -80,6 +77,9 @@ public class MasterConsumer extends DefaultConsumer {
 
         if (view != null) {
             view.removeEventListener(leadershipListener);
+            clusterService.releaseView(view);
+
+            view = null;
         }
 
         ServiceHelper.stopAndShutdownServices(delegatedConsumer);
@@ -107,7 +107,7 @@ public class MasterConsumer extends DefaultConsumer {
     @ManagedAttribute(description = "Are we the master")
     public boolean isMaster() {
         return view != null
-            ? view.getLocalMember().isMaster()
+            ? view.getLocalMember().isLeader()
             : false;
     }
 
@@ -132,7 +132,7 @@ public class MasterConsumer extends DefaultConsumer {
         ServiceHelper.startService(delegatedEndpoint);
         ServiceHelper.startService(delegatedConsumer);
 
-        LOGER.info("Leadership taken: consumer started: {}", 
delegatedEndpoint);
+        LOGGER.info("Leadership taken: consumer started: {}", 
delegatedEndpoint);
     }
 
     private synchronized void onLeadershipLost() throws Exception {
@@ -141,7 +141,7 @@ public class MasterConsumer extends DefaultConsumer {
 
         delegatedConsumer = null;
 
-        LOGER.info("Leadership lost: consumer stopped: {}", delegatedEndpoint);
+        LOGGER.info("Leadership lost: consumer stopped: {}", 
delegatedEndpoint);
     }
 
     // **************************************
@@ -150,13 +150,13 @@ public class MasterConsumer extends DefaultConsumer {
 
     private final class LeadershipListener implements 
CamelClusterEventListener.Leadership {
         @Override
-        public void leadershipChanged(CamelClusterView view, 
CamelClusterMember leader) {
+        public void leadershipChanged(CamelClusterView view, 
Optional<CamelClusterMember> leader) {
             if (!isRunAllowed()) {
                 return;
             }
 
             try {
-                if (view.getLocalMember().isMaster()) {
+                if (view.getLocalMember().isLeader()) {
                     onLeadershipTaken();
                 } else if (delegatedConsumer != null) {
                     onLeadershipLost();

http://git-wip-us.apache.org/repos/asf/camel/blob/1c8bca8e/components/camel-master/src/main/java/org/apache/camel/component/master/MasterEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-master/src/main/java/org/apache/camel/component/master/MasterEndpoint.java
 
b/components/camel-master/src/main/java/org/apache/camel/component/master/MasterEndpoint.java
index 2b63597..10547a5 100644
--- 
a/components/camel-master/src/main/java/org/apache/camel/component/master/MasterEndpoint.java
+++ 
b/components/camel-master/src/main/java/org/apache/camel/component/master/MasterEndpoint.java
@@ -23,14 +23,22 @@ import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.api.management.ManagedAttribute;
 import org.apache.camel.api.management.ManagedResource;
+import org.apache.camel.ha.CamelClusterService;
 import org.apache.camel.impl.DefaultEndpoint;
 import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.UriEndpoint;
 import org.apache.camel.spi.UriPath;
 
 @ManagedResource(description = "Managed Master Endpoint")
-@UriEndpoint(firstVersion = "2.20.0", scheme = "master", syntax = 
"master:namespace:delegateUri", consumerClass = MasterConsumer.class, 
consumerOnly = true,
-    title = "Master", lenientProperties = true, label = "clustering")
+@UriEndpoint(
+    firstVersion = "2.20.0",
+    scheme = "master",
+    syntax = "master:namespace:delegateUri",
+    consumerClass = MasterConsumer.class,
+    consumerOnly = true,
+    title = "Master",
+    lenientProperties = true,
+    label = "clustering")
 public class MasterEndpoint extends DefaultEndpoint implements 
DelegateEndpoint {
 
     private final Endpoint delegateEndpoint;
@@ -43,8 +51,12 @@ public class MasterEndpoint extends DefaultEndpoint 
implements DelegateEndpoint
     @Metadata(required = "true")
     private final String delegateUri;
 
-    public MasterEndpoint(String uri, MasterComponent component, String 
namespace, String delegateUri) {
+    private final CamelClusterService clusterService;
+
+    public MasterEndpoint(String uri, MasterComponent component, 
CamelClusterService clusterService, String namespace, String delegateUri) {
         super(uri, component);
+
+        this.clusterService = clusterService;
         this.namespace = namespace;
         this.delegateUri = delegateUri;
         this.delegateEndpoint = getCamelContext().getEndpoint(delegateUri);
@@ -52,12 +64,13 @@ public class MasterEndpoint extends DefaultEndpoint 
implements DelegateEndpoint
 
     @Override
     public Producer createProducer() throws Exception {
+        getComponent();
         throw new UnsupportedOperationException("Cannot produce from this 
endpoint");
     }
 
     @Override
     public Consumer createConsumer(Processor processor) throws Exception {
-        return new MasterConsumer(this, processor);
+        return new MasterConsumer(this, processor, clusterService);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/camel/blob/1c8bca8e/components/camel-master/src/test/java/org/apache/camel/component/master/MasterComponentTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-master/src/test/java/org/apache/camel/component/master/MasterComponentTest.java
 
b/components/camel-master/src/test/java/org/apache/camel/component/master/MasterComponentTest.java
index bbd8ce3..c252ccc 100644
--- 
a/components/camel-master/src/test/java/org/apache/camel/component/master/MasterComponentTest.java
+++ 
b/components/camel-master/src/test/java/org/apache/camel/component/master/MasterComponentTest.java
@@ -27,9 +27,8 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.master.util.InMemoryClusterService;
+import org.apache.camel.component.file.ha.FileLockClusterService;
 import org.apache.camel.impl.DefaultCamelContext;
-import org.apache.camel.test.AvailablePortFinder;
 import org.junit.Assert;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -39,17 +38,14 @@ import org.slf4j.LoggerFactory;
 public class MasterComponentTest {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(MasterComponentTest.class);
     private static final List<String> INSTANCES = IntStream.range(0, 
3).mapToObj(Integer::toString).collect(Collectors.toList());
-    private static final List<Integer> PORTS = INSTANCES.stream().map(i -> 
AvailablePortFinder.getNextAvailable()).collect(Collectors.toList());
     private static final List<String> RESULTS = new ArrayList<>();
-    private static final ScheduledExecutorService SCHEDULER = 
Executors.newScheduledThreadPool(INSTANCES.size() * 2);
+    private static final ScheduledExecutorService SCHEDULER = 
Executors.newScheduledThreadPool(INSTANCES.size());
     private static final CountDownLatch LATCH = new 
CountDownLatch(INSTANCES.size());
 
     @Test
     public void test()  throws Exception {
-        for (int i = 0; i < INSTANCES.size(); i++) {
-            int index = i;
-            SCHEDULER.submit(() -> run(INSTANCES.get(index), index, PORTS));
-            Thread.sleep(1000);
+        for (String instance: INSTANCES) {
+            SCHEDULER.submit(() -> run(instance));
         }
 
         LATCH.await(1, TimeUnit.MINUTES);
@@ -63,13 +59,16 @@ public class MasterComponentTest {
     // Run a Camel node
     // ************************************
 
-    private static void run(String id, int index, List<Integer> ports) {
+    private static void run(String id) {
         try {
-            CountDownLatch contextLatch = new CountDownLatch(1);
+            int events = ThreadLocalRandom.current().nextInt(2, 6);
+            CountDownLatch contextLatch = new CountDownLatch(events);
 
-            InMemoryClusterService service = new InMemoryClusterService();
-            service.setIndex(index);
-            service.setPorts(ports);
+            FileLockClusterService service = new FileLockClusterService();
+            service.setId(id);
+            service.setRoot("target/ha");
+            service.setAcquireLockDelay(1, TimeUnit.SECONDS);
+            service.setAcquireLockInterval(1, TimeUnit.SECONDS);
 
             DefaultCamelContext context = new DefaultCamelContext();
             context.disableJMX();
@@ -78,15 +77,10 @@ public class MasterComponentTest {
             context.addRoutes(new RouteBuilder() {
                 @Override
                 public void configure() throws Exception {
-                    
from("master:ns:timer:test?delay=1s&period=1s&repeatCount=1")
+                    from("master:ns:timer:test?delay=1s&period=1s")
                         .routeId("route-" + id)
-                        .process(e -> {
-                            LOGGER.info("Node {} done", id);
-                            RESULTS.add(id);
-                            // Shutdown the context later on to give a chance 
to
-                            // other members to catch-up
-                            SCHEDULER.schedule(contextLatch::countDown, 2 + 
ThreadLocalRandom.current().nextInt(3), TimeUnit.SECONDS);
-                        });
+                        .log("From ${routeId}")
+                        .process(e -> contextLatch.countDown());
                 }
             });
 
@@ -96,6 +90,10 @@ public class MasterComponentTest {
             context.start();
 
             contextLatch.await();
+
+            LOGGER.debug("Shutting down node {}", id);
+            RESULTS.add(id);
+
             context.stop();
 
             LATCH.countDown();

http://git-wip-us.apache.org/repos/asf/camel/blob/1c8bca8e/components/camel-master/src/test/java/org/apache/camel/component/master/util/InMemoryClusterMember.java
----------------------------------------------------------------------
diff --git 
a/components/camel-master/src/test/java/org/apache/camel/component/master/util/InMemoryClusterMember.java
 
b/components/camel-master/src/test/java/org/apache/camel/component/master/util/InMemoryClusterMember.java
deleted file mode 100644
index cfe3fcc..0000000
--- 
a/components/camel-master/src/test/java/org/apache/camel/component/master/util/InMemoryClusterMember.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.master.util;
-
-import java.util.List;
-
-import org.apache.camel.ha.CamelClusterMember;
-import org.apache.camel.util.ObjectHelper;
-import org.jgroups.Address;
-import org.jgroups.JChannel;
-import org.jgroups.View;
-
-public class InMemoryClusterMember implements CamelClusterMember {
-    private final JChannel channel;
-    private final View view;
-
-    public InMemoryClusterMember(JChannel channel) {
-        this.channel = channel;
-        this.view = channel.getView();
-    }
-
-    @Override
-    public boolean isMaster() {
-        final List<Address> members = view.getMembers();
-
-        return ObjectHelper.isNotEmpty(members)
-            ? members.get(0).equals(channel.getAddress())
-            : false;
-    }
-
-    @Override
-    public String getId() {
-        return channel.name();
-    }
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/1c8bca8e/components/camel-master/src/test/java/org/apache/camel/component/master/util/InMemoryClusterService.java
----------------------------------------------------------------------
diff --git 
a/components/camel-master/src/test/java/org/apache/camel/component/master/util/InMemoryClusterService.java
 
b/components/camel-master/src/test/java/org/apache/camel/component/master/util/InMemoryClusterService.java
deleted file mode 100644
index 7b40a72..0000000
--- 
a/components/camel-master/src/test/java/org/apache/camel/component/master/util/InMemoryClusterService.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.master.util;
-
-import java.util.List;
-
-import org.apache.camel.impl.ha.AbstractCamelClusterService;
-
-public class InMemoryClusterService extends 
AbstractCamelClusterService<InMemoryClusterView> {
-    private int index;
-    private List<Integer> ports;
-
-    @Override
-    protected InMemoryClusterView createView(String namespace) throws 
Exception {
-        return new InMemoryClusterView(this, namespace);
-    }
-
-    public int getIndex() {
-        return index;
-    }
-
-    public void setIndex(int index) {
-        this.index = index;
-    }
-
-    public List<Integer> getPorts() {
-        return ports;
-    }
-
-    public void setPorts(List<Integer> ports) {
-        this.ports = ports;
-    }
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/1c8bca8e/components/camel-master/src/test/java/org/apache/camel/component/master/util/InMemoryClusterView.java
----------------------------------------------------------------------
diff --git 
a/components/camel-master/src/test/java/org/apache/camel/component/master/util/InMemoryClusterView.java
 
b/components/camel-master/src/test/java/org/apache/camel/component/master/util/InMemoryClusterView.java
deleted file mode 100644
index 41728e1..0000000
--- 
a/components/camel-master/src/test/java/org/apache/camel/component/master/util/InMemoryClusterView.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.master.util;
-
-import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Optional;
-
-import static java.util.stream.Collectors.toList;
-
-import org.apache.camel.ha.CamelClusterMember;
-import org.apache.camel.ha.CamelClusterService;
-import org.apache.camel.impl.ha.AbstractCamelClusterView;
-import org.apache.camel.util.ObjectHelper;
-import org.jgroups.Address;
-import org.jgroups.JChannel;
-import org.jgroups.PhysicalAddress;
-import org.jgroups.ReceiverAdapter;
-import org.jgroups.View;
-import org.jgroups.protocols.TCP;
-import org.jgroups.protocols.TCPPING;
-import org.jgroups.stack.IpAddress;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class InMemoryClusterView extends AbstractCamelClusterView {
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(InMemoryClusterView.class);
-
-    private JChannel channel;
-
-    protected InMemoryClusterView(CamelClusterService cluster, String 
namespace) {
-        super(cluster, namespace);
-    }
-
-    @Override
-    public Optional<CamelClusterMember> getMaster() {
-        return channel != null
-            ? Optional.of(new InMemoryClusterMember(channel))
-            : Optional.empty();
-    }
-
-    @Override
-    public CamelClusterMember getLocalMember() {
-        return new LocalClusterMember();
-    }
-
-    @Override
-    public List<CamelClusterMember> getMembers() {
-        if (channel != null) {
-            channel.getView().getMembers().stream()
-                .map(ClusterMember::new)
-                .collect(toList());
-        }
-
-        return Collections.emptyList();
-    }
-
-    @Override
-    protected void doStart() throws Exception {
-        final int index = 
getClusterService().unwrap(InMemoryClusterService.class).getIndex();
-        final List<Integer> ports = 
getClusterService().unwrap(InMemoryClusterService.class).getPorts();
-        final List<PhysicalAddress> addresses = new ArrayList<>();
-
-        for (Integer port: ports) {
-            addresses.add(new IpAddress("127.0.0.1", port));
-        }
-
-        this.channel = new 
JChannel(getClass().getResourceAsStream("/jgroups-tcp.xml"));
-
-        TCP tcp = this.channel.getProtocolStack().findProtocol(TCP.class);
-        tcp.setBindAddress(InetAddress.getByName("127.0.0.1"));
-        tcp.setBindPort(ports.get(index));
-
-        TCPPING tcpping  = 
this.channel.getProtocolStack().findProtocol(TCPPING.class);
-        tcpping.setInitialHosts(addresses);
-
-        this.channel.setReceiver(new ReceiverAdapter() {
-            @Override
-            public void viewAccepted(View view) {
-                fireLeadershipChangedEvent(new 
ClusterMember(view.getMembers().get(0)));
-            }
-        });
-
-        this.channel.connect(getNamespace());
-    }
-
-    @Override
-    protected void doStop() throws Exception {
-        if (channel != null) {
-            channel.close();
-        }
-    }
-
-    // ***********************************
-    //
-    // ***********************************
-
-    private class LocalClusterMember implements CamelClusterMember {
-        @Override
-        public boolean isMaster() {
-            if (channel == null) {
-                return false;
-            }
-
-            List<Address> members = channel.view().getMembers();
-
-            if (ObjectHelper.isNotEmpty(members)) {
-                LOGGER.info("master={}, channel={}, members={}", 
members.get(0), channel.getAddress(), members);
-                return members.get(0).equals(channel.getAddress());
-            }
-
-            return false;
-        }
-
-        @Override
-        public String getId() {
-            return channel != null ? channel.getAddressAsString() : "local";
-        }
-    }
-
-    private class ClusterMember implements CamelClusterMember {
-        private final Address address;
-
-        public ClusterMember(Address address) {
-            this.address = address;
-        }
-
-        @Override
-        public boolean isMaster() {
-            final List<Address> members = channel.view().getMembers();
-
-            return ObjectHelper.isNotEmpty(members)
-                ? members.get(0).equals(address)
-                : false;
-        }
-
-        @Override
-        public String getId() {
-            return channel.getAddressAsString();
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/1c8bca8e/components/camel-master/src/test/resources/jgroups-tcp.xml
----------------------------------------------------------------------
diff --git a/components/camel-master/src/test/resources/jgroups-tcp.xml 
b/components/camel-master/src/test/resources/jgroups-tcp.xml
deleted file mode 100644
index 3e6d58d..0000000
--- a/components/camel-master/src/test/resources/jgroups-tcp.xml
+++ /dev/null
@@ -1,53 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-
-    Licensed to the Apache Software Foundation (ASF) under one or more
-    contributor license agreements.  See the NOTICE file distributed with
-    this work for additional information regarding copyright ownership.
-    The ASF licenses this file to You under the Apache License, Version 2.0
-    (the "License"); you may not use this file except in compliance with
-    the License.  You may obtain a copy of the License at
-
-         http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing, software
-    distributed under the License is distributed on an "AS IS" BASIS,
-    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-    See the License for the specific language governing permissions and
-    limitations under the License.
-
--->
-<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-        xmlns="urn:org:jgroups"
-        xsi:schemaLocation="urn:org:jgroups 
http://www.jgroups.org/schema/jgroups.xsd";>
-  <TCP bind_port="7800"
-       recv_buf_size="${tcp.recv_buf_size:130k}"
-       send_buf_size="${tcp.send_buf_size:130k}"
-       max_bundle_size="64K"
-       sock_conn_timeout="300"
-
-       thread_pool.min_threads="0"
-       thread_pool.max_threads="20"
-       thread_pool.keep_alive_time="30000"/>
-
-  <TCPPING async_discovery="true"
-           
initial_hosts="${jgroups.tcpping.initial_hosts:localhost[7800],localhost[7801]}"
-           port_range="2"/>
-  <MERGE3  min_interval="10000"
-           max_interval="30000"/>
-  <FD_SOCK/>
-  <FD timeout="3000" max_tries="3" />
-  <VERIFY_SUSPECT timeout="1500"  />
-  <BARRIER />
-  <pbcast.NAKACK2 use_mcast_xmit="false"
-                  discard_delivered_msgs="true"/>
-  <UNICAST3 />
-  <pbcast.STABLE desired_avg_gossip="50000"
-                 max_bytes="4M"/>
-  <pbcast.GMS print_local_addr="true" join_timeout="2000"/>
-  <MFC max_credits="2M"
-       min_threshold="0.4"/>
-  <FRAG2 frag_size="60K"  />
-  <!--RSVP resend_interval="2000" timeout="10000"/-->
-  <pbcast.STATE_TRANSFER/>
-</config>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/1c8bca8e/components/camel-master/src/test/resources/log4j2.properties
----------------------------------------------------------------------
diff --git a/components/camel-master/src/test/resources/log4j2.properties 
b/components/camel-master/src/test/resources/log4j2.properties
index e5a24c1f..40a55e4 100644
--- a/components/camel-master/src/test/resources/log4j2.properties
+++ b/components/camel-master/src/test/resources/log4j2.properties
@@ -25,6 +25,9 @@ appender.out.name = out
 appender.out.layout.type = PatternLayout
 appender.out.layout.pattern = [%t] %c{1} %-5p %m%n
 
+
+logger.camel-file-ha.name = org.apache.camel.component.file.ha
+logger.camel-file-ha.level = DEBUG
 logger.camel-master.name = org.apache.camel.component.master
 logger.camel-master.level = DEBUG
 logger.camel.name = org.apache.camel
@@ -34,7 +37,7 @@ logger.springframework.name = org.springframework
 logger.springframework.level = WARN
 
 rootLogger.level = INFO
-rootLogger.appenderRef.stdout.ref = out
+#rootLogger.appenderRef.stdout.ref = out
 rootLogger.appenderRef.file.ref = file
 
 

http://git-wip-us.apache.org/repos/asf/camel/blob/1c8bca8e/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/SpringZooKeeperClusteredRouteConfigurationTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/SpringZooKeeperClusteredRouteConfigurationTest.java
 
b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/SpringZooKeeperClusteredRouteConfigurationTest.java
new file mode 100644
index 0000000..f02df61
--- /dev/null
+++ 
b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/SpringZooKeeperClusteredRouteConfigurationTest.java
@@ -0,0 +1,26 @@
+package org.apache.camel.component.zookeeper.ha;
+
+import org.apache.camel.ha.CamelClusterService;
+import org.apache.camel.impl.ha.ClusteredRoutePolicyFactory;
+import org.apache.camel.test.spring.CamelSpringTestSupport;
+import org.junit.Test;
+import org.springframework.context.support.AbstractApplicationContext;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+
+public class SpringZooKeeperClusteredRouteConfigurationTest extends 
CamelSpringTestSupport {
+
+    @Test
+    public void test() {
+        assertNotNull(context.hasService(CamelClusterService.class));
+        
assertTrue(context.getRoutePolicyFactories().stream().anyMatch(ClusteredRoutePolicyFactory.class::isInstance));
+    }
+
+    // ***********************
+    // Routes
+    // ***********************
+
+    @Override
+    protected AbstractApplicationContext createApplicationContext() {
+        return new 
ClassPathXmlApplicationContext("org/apache/camel/component/zookeeper/ha/SpringZooKeeperClusteredRouteConfigurationTest.xml");
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/1c8bca8e/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperMasterMain.java
----------------------------------------------------------------------
diff --git 
a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperMasterMain.java
 
b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperMasterMain.java
index 148a5dd..1c70ff0 100644
--- 
a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperMasterMain.java
+++ 
b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperMasterMain.java
@@ -21,6 +21,7 @@ import java.util.concurrent.ThreadLocalRandom;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.ExplicitCamelContextNameStrategy;
 import org.apache.camel.main.Main;
 import org.apache.camel.main.MainListenerSupport;
 
@@ -29,16 +30,17 @@ public final class ZooKeeperMasterMain {
         final String nodeId = UUID.randomUUID().toString();
         final String address = args[0];
 
-        ZooKeeperClusterService service = new ZooKeeperClusterService();
-        service.setId("node-" + nodeId);
-        service.setNodes(address);
-        service.setBasePath("/camel/master");
-
         Main main = new Main();
         main.addMainListener(new MainListenerSupport() {
             @Override
             public void configure(CamelContext context) {
                 try {
+                    ZooKeeperClusterService service = new 
ZooKeeperClusterService();
+                    service.setId("node-" + nodeId);
+                    service.setNodes(address);
+                    service.setBasePath("/camel/master");
+
+                    context.setNameStrategy(new 
ExplicitCamelContextNameStrategy("camel-" + nodeId));
                     context.addService(service);
                 } catch (Exception e) {
                     throw new RuntimeException(e);

http://git-wip-us.apache.org/repos/asf/camel/blob/1c8bca8e/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperMasterTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperMasterTest.java
 
b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperMasterTest.java
index 948834a..3e50bd2 100644
--- 
a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperMasterTest.java
+++ 
b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperMasterTest.java
@@ -41,7 +41,7 @@ public final class ZooKeeperMasterTest {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(ZooKeeperMasterTest.class);
     private static final List<String> CLIENTS = IntStream.range(0, 
3).mapToObj(Integer::toString).collect(Collectors.toList());
     private static final List<String> RESULTS = new ArrayList<>();
-    private static final ScheduledExecutorService SCHEDULER = 
Executors.newScheduledThreadPool(CLIENTS.size() * 2);
+    private static final ScheduledExecutorService SCHEDULER = 
Executors.newScheduledThreadPool(CLIENTS.size());
     private static final CountDownLatch LATCH = new 
CountDownLatch(CLIENTS.size());
 
     // ************************************
@@ -78,7 +78,8 @@ public final class ZooKeeperMasterTest {
 
     private static void run(String id) {
         try {
-            CountDownLatch contextLatch = new CountDownLatch(1);
+            int events = ThreadLocalRandom.current().nextInt(2, 6);
+            CountDownLatch contextLatch = new CountDownLatch(events);
 
             ZooKeeperClusterService service = new ZooKeeperClusterService();
             service.setId("node-" + id);
@@ -92,15 +93,10 @@ public final class ZooKeeperMasterTest {
             context.addRoutes(new RouteBuilder() {
                 @Override
                 public void configure() throws Exception {
-                    
from("master:zk:timer:master?delay=1s&period=1s&repeatCount=1")
+                    from("master:zk:timer:master?delay=1s&period=1s")
                         .routeId("route-" + id)
-                        .process(e -> {
-                            LOGGER.debug("Node {} done", id);
-                            RESULTS.add(id);
-                            // Shutdown the context later on to give a chance 
to
-                            // other members to catch-up
-                            SCHEDULER.schedule(contextLatch::countDown, 2 + 
ThreadLocalRandom.current().nextInt(3), TimeUnit.SECONDS);
-                        });
+                        .log("From ${routeId}")
+                        .process(e -> contextLatch.countDown());
                 }
             });
 
@@ -110,6 +106,10 @@ public final class ZooKeeperMasterTest {
             context.start();
 
             contextLatch.await();
+
+            LOGGER.debug("Shutting down node {}", id);
+            RESULTS.add(id);
+
             context.stop();
 
             LATCH.countDown();

http://git-wip-us.apache.org/repos/asf/camel/blob/1c8bca8e/components/camel-zookeeper/src/test/resources/org/apache/camel/component/zookeeper/ha/SpringZooKeeperClusteredRouteConfigurationTest.xml
----------------------------------------------------------------------
diff --git 
a/components/camel-zookeeper/src/test/resources/org/apache/camel/component/zookeeper/ha/SpringZooKeeperClusteredRouteConfigurationTest.xml
 
b/components/camel-zookeeper/src/test/resources/org/apache/camel/component/zookeeper/ha/SpringZooKeeperClusteredRouteConfigurationTest.xml
new file mode 100644
index 0000000..2ab9589
--- /dev/null
+++ 
b/components/camel-zookeeper/src/test/resources/org/apache/camel/component/zookeeper/ha/SpringZooKeeperClusteredRouteConfigurationTest.xml
@@ -0,0 +1,42 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+         http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<beans xmlns="http://www.springframework.org/schema/beans";
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       xsi:schemaLocation="
+         http://www.springframework.org/schema/beans
+         http://www.springframework.org/schema/beans/spring-beans.xsd
+         http://camel.apache.org/schema/spring
+         http://camel.apache.org/schema/spring/camel-spring.xsd";>
+
+
+  <bean id="cluster-service" 
class="org.apache.camel.component.zookeeper.ha.ZooKeeperClusterService">
+    <property name="id" value="node-1"/>
+    <property name="basePath" value="/camel/ha"/>
+    <property name="nodes" value="localhost:2181"/>
+  </bean>
+
+  <bean id="cluster-policy" 
class="org.apache.camel.impl.ha.ClusteredRoutePolicyFactory" 
factory-method="forNamespace">
+    <constructor-arg value="my-ns"/>
+  </bean>
+
+  <camelContext xmlns="http://camel.apache.org/schema/spring"; 
autoStartup="false">
+  </camelContext>
+
+</beans>

http://git-wip-us.apache.org/repos/asf/camel/blob/1c8bca8e/platforms/spring-boot/components-starter/camel-master-starter/src/main/java/org/apache/camel/component/master/springboot/MasterComponentConfiguration.java
----------------------------------------------------------------------
diff --git 
a/platforms/spring-boot/components-starter/camel-master-starter/src/main/java/org/apache/camel/component/master/springboot/MasterComponentConfiguration.java
 
b/platforms/spring-boot/components-starter/camel-master-starter/src/main/java/org/apache/camel/component/master/springboot/MasterComponentConfiguration.java
index 9c104c1..3a1de95 100644
--- 
a/platforms/spring-boot/components-starter/camel-master-starter/src/main/java/org/apache/camel/component/master/springboot/MasterComponentConfiguration.java
+++ 
b/platforms/spring-boot/components-starter/camel-master-starter/src/main/java/org/apache/camel/component/master/springboot/MasterComponentConfiguration.java
@@ -17,12 +17,14 @@
 package org.apache.camel.component.master.springboot;
 
 import javax.annotation.Generated;
+import org.apache.camel.ha.CamelClusterService;
+import org.apache.camel.ha.CamelClusterService.Selector;
 import org.apache.camel.spring.boot.ComponentConfigurationPropertiesCommon;
 import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.boot.context.properties.NestedConfigurationProperty;
 
 /**
- * Represents an endpoint which only becomes active when it obtains the master
- * lock
+ * Camel Master Support
  * 
  * Generated by camel-package-maven-plugin - do not edit this file!
  */
@@ -33,12 +35,39 @@ public class MasterComponentConfiguration
             ComponentConfigurationPropertiesCommon {
 
     /**
+     * Inject the service to use.
+     */
+    @NestedConfigurationProperty
+    private CamelClusterService service;
+    /**
+     * Inject the service selector used to lookup the CamelClusterService to
+     * use.
+     */
+    @NestedConfigurationProperty
+    private Selector serviceSelector;
+    /**
      * Whether the component should resolve property placeholders on itself 
when
      * starting. Only properties which are of String type can use property
      * placeholders.
      */
     private Boolean resolvePropertyPlaceholders = true;
 
+    public CamelClusterService getService() {
+        return service;
+    }
+
+    public void setService(CamelClusterService service) {
+        this.service = service;
+    }
+
+    public Selector getServiceSelector() {
+        return serviceSelector;
+    }
+
+    public void setServiceSelector(Selector serviceSelector) {
+        this.serviceSelector = serviceSelector;
+    }
+
     public Boolean getResolvePropertyPlaceholders() {
         return resolvePropertyPlaceholders;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/1c8bca8e/platforms/spring-boot/spring-boot-dm/camel-spring-boot-dependencies/pom.xml
----------------------------------------------------------------------
diff --git 
a/platforms/spring-boot/spring-boot-dm/camel-spring-boot-dependencies/pom.xml 
b/platforms/spring-boot/spring-boot-dm/camel-spring-boot-dependencies/pom.xml
index 46c4fab..7171372 100644
--- 
a/platforms/spring-boot/spring-boot-dm/camel-spring-boot-dependencies/pom.xml
+++ 
b/platforms/spring-boot/spring-boot-dm/camel-spring-boot-dependencies/pom.xml
@@ -1756,6 +1756,16 @@
       </dependency>
       <dependency>
         <groupId>org.apache.camel</groupId>
+        <artifactId>camel-master</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.camel</groupId>
+        <artifactId>camel-master-starter</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.camel</groupId>
         <artifactId>camel-metrics</artifactId>
         <version>${project.version}</version>
       </dependency>

Reply via email to