Improve ClusterView factory methods

Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1bf1450a
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1bf1450a
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1bf1450a

Branch: refs/heads/master
Commit: 1bf1450a72731541522096837e98b8a51e81e68d
Parents: 83f5046
Author: lburgazzoli <lburgazz...@gmail.com>
Authored: Thu Sep 28 14:33:10 2017 +0200
Committer: lburgazzoli <lburgazz...@gmail.com>
Committed: Thu Sep 28 15:33:18 2017 +0200

----------------------------------------------------------------------
 .../camel/impl/ha/ClusteredRouteController.java |   2 +-
 .../camel/impl/ha/ClusteredRoutePolicy.java     |  61 +++++++---
 ...FileLockClusteredRoutePolicyFactoryTest.java | 108 ++++++++++++++++
 .../ha/FileLockClusteredRoutePolicyTest.java    |   4 +-
 ...ixClientClusteredRoutePolicyFactoryMain.java |  68 +++++++++++
 .../AtomixClientClusteredRoutePolicyMain.java   |  68 -----------
 .../ha/AtomixClientRoutePolicyFactoryTest.java  |  34 ++++++
 ...omixClientRoutePolicyFactoryTestSupport.java | 121 ++++++++++++++++++
 .../atomix/ha/AtomixClientRoutePolicyTest.java  |   2 +-
 .../ha/AtomixClientRoutePolicyTestSupport.java  |   4 +-
 ...ixEphemeralClientRoutePolicyFactoryTest.java |  34 ++++++
 .../atomix/ha/AtomixRoutePolicyFactoryTest.java | 117 ++++++++++++++++++
 .../atomix/ha/AtomixRoutePolicyTest.java        |   4 +-
 .../component/consul/ha/ConsulClusterView.java  |   6 +-
 .../ha/ConsulClusteredRoutePolicyFactoryIT.java | 111 +++++++++++++++++
 .../ConsulClusteredRoutePolicyFactoryMain.java  |  64 ++++++++++
 .../consul/ha/ConsulClusteredRoutePolicyIT.java |   4 +-
 .../ha/ConsulClusteredRoutePolicyMain.java      |  64 ----------
 ...ooKeeperClusteredRoutePolicyFactoryMain.java |  65 ++++++++++
 ...ooKeeperClusteredRoutePolicyFactoryTest.java | 122 +++++++++++++++++++
 .../ha/ZooKeeperClusteredRoutePolicyMain.java   |  65 ----------
 .../ha/ZooKeeperClusteredRoutePolicyTest.java   |   4 +-
 22 files changed, 901 insertions(+), 231 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/1bf1450a/camel-core/src/main/java/org/apache/camel/impl/ha/ClusteredRouteController.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/impl/ha/ClusteredRouteController.java
 
b/camel-core/src/main/java/org/apache/camel/impl/ha/ClusteredRouteController.java
index a07ed86..137fff5 100644
--- 
a/camel-core/src/main/java/org/apache/camel/impl/ha/ClusteredRouteController.java
+++ 
b/camel-core/src/main/java/org/apache/camel/impl/ha/ClusteredRouteController.java
@@ -309,7 +309,7 @@ public class ClusteredRouteController extends 
DefaultRouteController {
                     final String namespace = 
ObjectHelper.supplyIfEmpty(configuration.getNamespace(), 
defaultConfiguration::getNamespace);
                     final Duration initialDelay = 
ObjectHelper.supplyIfEmpty(configuration.getInitialDelay(), 
defaultConfiguration::getInitialDelay);
 
-                    ClusteredRoutePolicy policy = 
ClusteredRoutePolicy.forView(clusterService.getView(namespace));
+                    ClusteredRoutePolicy policy = 
ClusteredRoutePolicy.forNamespace(clusterService, namespace);
                     policy.setCamelContext(getCamelContext());
                     policy.setInitialDelay(initialDelay);
 

http://git-wip-us.apache.org/repos/asf/camel/blob/1bf1450a/camel-core/src/main/java/org/apache/camel/impl/ha/ClusteredRoutePolicy.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/impl/ha/ClusteredRoutePolicy.java 
b/camel-core/src/main/java/org/apache/camel/impl/ha/ClusteredRoutePolicy.java
index 5b45cbc..b537ca7 100644
--- 
a/camel-core/src/main/java/org/apache/camel/impl/ha/ClusteredRoutePolicy.java
+++ 
b/camel-core/src/main/java/org/apache/camel/impl/ha/ClusteredRoutePolicy.java
@@ -40,6 +40,7 @@ import org.apache.camel.ha.CamelClusterView;
 import org.apache.camel.management.event.CamelContextStartedEvent;
 import org.apache.camel.support.EventNotifierSupport;
 import org.apache.camel.support.RoutePolicySupport;
+import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.ReferenceCount;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -52,17 +53,27 @@ public final class ClusteredRoutePolicy extends 
RoutePolicySupport implements Ca
     private final Set<Route> startedRoutes;
     private final Set<Route> stoppedRoutes;
     private final ReferenceCount refCount;
-    private final CamelClusterView clusterView;
     private final CamelClusterEventListener.Leadership leadershipEventListener;
     private final CamelContextStartupListener listener;
     private final AtomicBoolean contextStarted;
+
+    private final String namespace;
+    private final CamelClusterService.Selector clusterServiceSelector;
+    private CamelClusterService clusterService;
+    private CamelClusterView clusterView;
+
     private Duration initialDelay;
     private ScheduledExecutorService executorService;
 
     private CamelContext camelContext;
 
-    private ClusteredRoutePolicy(CamelClusterView clusterView) {
-        this.clusterView = clusterView;
+    private ClusteredRoutePolicy(CamelClusterService clusterService, 
CamelClusterService.Selector clusterServiceSelector, String namespace) {
+        this.namespace = namespace;
+        this.clusterService = clusterService;
+        this.clusterServiceSelector = clusterServiceSelector;
+
+        ObjectHelper.notNull(namespace, "Namespace");
+
         this.leadershipEventListener = new CamelClusterLeadershipListener();
 
         this.stoppedRoutes = new HashSet<>();
@@ -156,6 +167,24 @@ public final class ClusteredRoutePolicy extends 
RoutePolicySupport implements Ca
     }
 
     @Override
+    public void doStart() throws Exception {
+        if (clusterService == null) {
+            clusterService = ClusterServiceHelper.lookupService(camelContext, 
clusterServiceSelector).orElseThrow(
+                () -> new IllegalStateException("CamelCluster service not 
found")
+            );
+        }
+
+        LOGGER.debug("ClusteredRoutePolicy {} is using ClusterService instance 
{} (id={}, type={})",
+            this,
+            clusterService,
+            clusterService.getId(),
+            clusterService.getClass().getName()
+        );
+
+        clusterView = clusterService.getView(namespace);
+    }
+
+    @Override
     public void doShutdown() throws Exception {
         this.refCount.release();
     }
@@ -318,11 +347,10 @@ public final class ClusteredRoutePolicy extends 
RoutePolicySupport implements Ca
     // ****************************************************
 
     public static ClusteredRoutePolicy forNamespace(CamelContext camelContext, 
CamelClusterService.Selector selector, String namespace) throws Exception {
-        final CamelClusterService service = 
ClusterServiceHelper.lookupService(camelContext, selector).orElseThrow(
-            () -> new IllegalStateException("CamelCluster service not found")
-        );
+        ClusteredRoutePolicy policy = new ClusteredRoutePolicy(null, selector, 
namespace);
+        policy.setCamelContext(camelContext);
 
-        return forNamespace(service, namespace);
+        return policy;
     }
 
     public static ClusteredRoutePolicy forNamespace(CamelContext camelContext, 
String namespace) throws Exception {
@@ -330,21 +358,14 @@ public final class ClusteredRoutePolicy extends 
RoutePolicySupport implements Ca
     }
 
     public static ClusteredRoutePolicy forNamespace(CamelClusterService 
service, String namespace) throws Exception {
-        return forView(service.getView(namespace));
+        return new ClusteredRoutePolicy(service, 
ClusterServiceSelectors.DEFAULT_SELECTOR, namespace);
     }
 
-    public static ClusteredRoutePolicy forView(CamelClusterView view) throws 
Exception  {
-
-        ClusteredRoutePolicy policy = new ClusteredRoutePolicy(view);
-        policy.setCamelContext(view.getCamelContext());
-
-        LOGGER.debug("ClusteredRoutePolicy {} is using ClusterService instance 
{} (id={}, type={})",
-            policy,
-            view.getClusterService(),
-            view.getClusterService().getId(),
-            view.getClusterService().getClass().getName()
-        );
+    public static ClusteredRoutePolicy 
forNamespace(CamelClusterService.Selector selector, String namespace) throws 
Exception {
+        return new ClusteredRoutePolicy(null, selector, namespace);
+    }
 
-        return policy;
+    public static ClusteredRoutePolicy forNamespace(String namespace) throws 
Exception {
+        return forNamespace(ClusterServiceSelectors.DEFAULT_SELECTOR, 
namespace);
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/1bf1450a/camel-core/src/test/java/org/apache/camel/component/file/ha/FileLockClusteredRoutePolicyFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/component/file/ha/FileLockClusteredRoutePolicyFactoryTest.java
 
b/camel-core/src/test/java/org/apache/camel/component/file/ha/FileLockClusteredRoutePolicyFactoryTest.java
new file mode 100644
index 0000000..a93b574
--- /dev/null
+++ 
b/camel-core/src/test/java/org/apache/camel/component/file/ha/FileLockClusteredRoutePolicyFactoryTest.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.ha;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.impl.ha.ClusteredRoutePolicyFactory;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class FileLockClusteredRoutePolicyFactoryTest {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(FileLockClusteredRoutePolicyFactoryTest.class);
+    private static final List<String> CLIENTS = IntStream.range(0, 
3).mapToObj(Integer::toString).collect(Collectors.toList());
+    private static final List<String> RESULTS = new ArrayList<>();
+    private static final ScheduledExecutorService SCHEDULER = 
Executors.newScheduledThreadPool(CLIENTS.size());
+    private static final CountDownLatch LATCH = new 
CountDownLatch(CLIENTS.size());
+
+    // ************************************
+    // Test
+    // ************************************
+
+    @Test
+    public void test() throws Exception {
+        for (String id : CLIENTS) {
+            SCHEDULER.submit(() -> run(id));
+        }
+
+        LATCH.await(1, TimeUnit.MINUTES);
+        SCHEDULER.shutdownNow();
+
+        Assert.assertEquals(CLIENTS.size(), RESULTS.size());
+        Assert.assertTrue(RESULTS.containsAll(CLIENTS));
+    }
+
+    // ************************************
+    // Run a Camel node
+    // ************************************
+
+    private static void run(String id) {
+        try {
+            int events = ThreadLocalRandom.current().nextInt(2, 6);
+            CountDownLatch contextLatch = new CountDownLatch(events);
+
+            FileLockClusterService service = new FileLockClusterService();
+            service.setId("node-" + id);
+            service.setRoot("target/ha");
+            service.setAcquireLockDelay(1, TimeUnit.SECONDS);
+            service.setAcquireLockInterval(1, TimeUnit.SECONDS);
+
+            DefaultCamelContext context = new DefaultCamelContext();
+            context.disableJMX();
+            context.setName("context-" + id);
+            context.addService(service);
+            
context.addRoutePolicyFactory(ClusteredRoutePolicyFactory.forNamespace("my-ns"));
+            context.addRoutes(new RouteBuilder() {
+                @Override
+                public void configure() throws Exception {
+                    from("timer:file-lock?delay=1s&period=1s")
+                        .routeId("route-" + id)
+                        .log("From ${routeId}")
+                        .process(e -> contextLatch.countDown());
+                }
+            });
+
+            // Start the context after some random time so the startup order
+            // changes for each test.
+            Thread.sleep(ThreadLocalRandom.current().nextInt(500));
+            context.start();
+
+            contextLatch.await();
+
+            LOGGER.debug("Shutting down node {}", id);
+            RESULTS.add(id);
+
+            context.stop();
+
+            LATCH.countDown();
+        } catch (Exception e) {
+            LOGGER.warn("", e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/1bf1450a/camel-core/src/test/java/org/apache/camel/component/file/ha/FileLockClusteredRoutePolicyTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/component/file/ha/FileLockClusteredRoutePolicyTest.java
 
b/camel-core/src/test/java/org/apache/camel/component/file/ha/FileLockClusteredRoutePolicyTest.java
index 3e19566..77fd3e2 100644
--- 
a/camel-core/src/test/java/org/apache/camel/component/file/ha/FileLockClusteredRoutePolicyTest.java
+++ 
b/camel-core/src/test/java/org/apache/camel/component/file/ha/FileLockClusteredRoutePolicyTest.java
@@ -28,7 +28,7 @@ import java.util.stream.IntStream;
 
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.impl.DefaultCamelContext;
-import org.apache.camel.impl.ha.ClusteredRoutePolicyFactory;
+import org.apache.camel.impl.ha.ClusteredRoutePolicy;
 import org.junit.Assert;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -77,12 +77,12 @@ public final class FileLockClusteredRoutePolicyTest {
             context.disableJMX();
             context.setName("context-" + id);
             context.addService(service);
-            
context.addRoutePolicyFactory(ClusteredRoutePolicyFactory.forNamespace("my-ns"));
             context.addRoutes(new RouteBuilder() {
                 @Override
                 public void configure() throws Exception {
                     from("timer:file-lock?delay=1s&period=1s")
                         .routeId("route-" + id)
+                        
.routePolicy(ClusteredRoutePolicy.forNamespace("my-ns"))
                         .log("From ${routeId}")
                         .process(e -> contextLatch.countDown());
                 }

http://git-wip-us.apache.org/repos/asf/camel/blob/1bf1450a/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientClusteredRoutePolicyFactoryMain.java
----------------------------------------------------------------------
diff --git 
a/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientClusteredRoutePolicyFactoryMain.java
 
b/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientClusteredRoutePolicyFactoryMain.java
new file mode 100644
index 0000000..d009595
--- /dev/null
+++ 
b/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientClusteredRoutePolicyFactoryMain.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.atomix.ha;
+
+import java.util.UUID;
+
+import io.atomix.catalyst.transport.Address;
+import io.atomix.copycat.server.storage.StorageLevel;
+import org.apache.camel.CamelContext;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.ExplicitCamelContextNameStrategy;
+import org.apache.camel.impl.ha.ClusteredRoutePolicyFactory;
+import org.apache.camel.main.Main;
+import org.apache.camel.main.MainListenerSupport;
+
+public final class AtomixClientClusteredRoutePolicyFactoryMain {
+    private AtomixClientClusteredRoutePolicyFactoryMain() {
+    }
+
+    public static void main(String[] args) throws Exception {
+        final String id = UUID.randomUUID().toString();
+
+        Main main = new Main();
+        main.addMainListener(new MainListenerSupport() {
+            @Override
+            public void configure(CamelContext context) {
+                try {
+                    AtomixClusterService service = new AtomixClusterService();
+                    service.setId("node-" + id);
+                    service.setStorageLevel(StorageLevel.MEMORY);
+                    service.setAddress(new Address(args[0]));
+                    service.setNodes(args.length > 1 ? args[1] : args[0]);
+
+                    context.setNameStrategy(new 
ExplicitCamelContextNameStrategy("camel-" + id));
+                    context.addService(service);
+                    
context.addRoutePolicyFactory(ClusteredRoutePolicyFactory.forNamespace("my-ns"));
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        });
+
+        main.addRouteBuilder(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("timer:clustered?delay=1s&period=1s")
+                    .routeId("route-" + id)
+                    .log("Route ${routeId} is running ...");
+            }
+        });
+
+        main.run();
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/1bf1450a/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientClusteredRoutePolicyMain.java
----------------------------------------------------------------------
diff --git 
a/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientClusteredRoutePolicyMain.java
 
b/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientClusteredRoutePolicyMain.java
deleted file mode 100644
index ada4ab9..0000000
--- 
a/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientClusteredRoutePolicyMain.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.atomix.ha;
-
-import java.util.UUID;
-
-import io.atomix.catalyst.transport.Address;
-import io.atomix.copycat.server.storage.StorageLevel;
-import org.apache.camel.CamelContext;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.impl.ExplicitCamelContextNameStrategy;
-import org.apache.camel.impl.ha.ClusteredRoutePolicyFactory;
-import org.apache.camel.main.Main;
-import org.apache.camel.main.MainListenerSupport;
-
-public final class AtomixClientClusteredRoutePolicyMain {
-    private AtomixClientClusteredRoutePolicyMain() {
-    }
-
-    public static void main(String[] args) throws Exception {
-        final String id = UUID.randomUUID().toString();
-
-        Main main = new Main();
-        main.addMainListener(new MainListenerSupport() {
-            @Override
-            public void configure(CamelContext context) {
-                try {
-                    AtomixClusterService service = new AtomixClusterService();
-                    service.setId("node-" + id);
-                    service.setStorageLevel(StorageLevel.MEMORY);
-                    service.setAddress(new Address(args[0]));
-                    service.setNodes(args.length > 1 ? args[1] : args[0]);
-
-                    context.setNameStrategy(new 
ExplicitCamelContextNameStrategy("camel-" + id));
-                    context.addService(service);
-                    
context.addRoutePolicyFactory(ClusteredRoutePolicyFactory.forNamespace("my-ns"));
-                } catch (Exception e) {
-                    throw new RuntimeException(e);
-                }
-            }
-        });
-
-        main.addRouteBuilder(new RouteBuilder() {
-            @Override
-            public void configure() throws Exception {
-                from("timer:clustered?delay=1s&period=1s")
-                    .routeId("route-" + id)
-                    .log("Route ${routeId} is running ...");
-            }
-        });
-
-        main.run();
-    }
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/1bf1450a/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientRoutePolicyFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientRoutePolicyFactoryTest.java
 
b/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientRoutePolicyFactoryTest.java
new file mode 100644
index 0000000..f29ba20
--- /dev/null
+++ 
b/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientRoutePolicyFactoryTest.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.atomix.ha;
+
+import java.util.Collections;
+
+import io.atomix.catalyst.transport.Address;
+import org.apache.camel.ha.CamelClusterService;
+
+public final class AtomixClientRoutePolicyFactoryTest extends 
AtomixClientRoutePolicyFactoryTestSupport {
+    @Override
+    protected CamelClusterService createClusterService(String id, Address 
bootstrapNode) {
+        AtomixClusterClientService service = new AtomixClusterClientService();
+        service.setId("node-" + id);
+        service.setNodes(Collections.singletonList(bootstrapNode));
+        service.setEphemeral(false);
+
+        return service;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/1bf1450a/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientRoutePolicyFactoryTestSupport.java
----------------------------------------------------------------------
diff --git 
a/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientRoutePolicyFactoryTestSupport.java
 
b/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientRoutePolicyFactoryTestSupport.java
new file mode 100644
index 0000000..95a7cf1
--- /dev/null
+++ 
b/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientRoutePolicyFactoryTestSupport.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.atomix.ha;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import io.atomix.AtomixReplica;
+import io.atomix.catalyst.transport.Address;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.atomix.client.AtomixFactory;
+import org.apache.camel.ha.CamelClusterService;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.impl.ha.ClusteredRoutePolicyFactory;
+import org.apache.camel.test.AvailablePortFinder;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AtomixClientRoutePolicyFactoryTestSupport {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(AtomixClientRoutePolicyFactoryTestSupport.class);
+
+    private final Address address = new Address("127.0.0.1", 
AvailablePortFinder.getNextAvailable());
+    private final List<String> clients = IntStream.range(0, 
3).mapToObj(Integer::toString).collect(Collectors.toList());
+    private final List<String> results = new ArrayList<>();
+    private final ScheduledExecutorService scheduler = 
Executors.newScheduledThreadPool(clients.size() * 2);
+    private final CountDownLatch latch = new CountDownLatch(clients.size());
+
+    // ************************************
+    // Test
+    // ************************************
+
+    @Test
+    public void test() throws Exception {
+        AtomixReplica boot = null;
+
+        try {
+            boot = AtomixFactory.replica(address);
+
+            for (String id : clients) {
+                scheduler.submit(() -> run(id));
+            }
+
+            latch.await(1, TimeUnit.MINUTES);
+            scheduler.shutdownNow();
+
+            Assert.assertEquals(clients.size(), results.size());
+            Assert.assertTrue(results.containsAll(clients));
+        } finally {
+            if (boot != null) {
+                boot.shutdown();
+            }
+        }
+    }
+
+    // ************************************
+    // Run a Camel node
+    // ************************************
+
+    private void run(String id) {
+        try {
+            int events = ThreadLocalRandom.current().nextInt(2, 6);
+            CountDownLatch contextLatch = new CountDownLatch(events);
+
+            DefaultCamelContext context = new DefaultCamelContext();
+            context.disableJMX();
+            context.setName("context-" + id);
+            context.addService(createClusterService(id, address));
+            
context.addRoutePolicyFactory(ClusteredRoutePolicyFactory.forNamespace("my-ns"));
+            context.addRoutes(new RouteBuilder() {
+                @Override
+                public void configure() throws Exception {
+                    from("timer:atomix?delay=1s&period=1s")
+                        .routeId("route-" + id)
+                        .log("From ${routeId}")
+                        .process(e -> contextLatch.countDown());
+                }
+            });
+
+            // Start the context after some random time so the startup order
+            // changes for each test.
+            Thread.sleep(ThreadLocalRandom.current().nextInt(500));
+            context.start();
+
+            contextLatch.await();
+
+            LOGGER.debug("Shutting down client node {}", id);
+            results.add(id);
+
+            context.stop();
+
+            latch.countDown();
+        } catch (Exception e) {
+            LOGGER.warn("", e);
+        }
+    }
+
+    protected abstract CamelClusterService createClusterService(String id, 
Address bootstrapNode);
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/1bf1450a/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientRoutePolicyTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientRoutePolicyTest.java
 
b/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientRoutePolicyTest.java
index be12571..4c038ca 100644
--- 
a/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientRoutePolicyTest.java
+++ 
b/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientRoutePolicyTest.java
@@ -21,7 +21,7 @@ import java.util.Collections;
 import io.atomix.catalyst.transport.Address;
 import org.apache.camel.ha.CamelClusterService;
 
-public final class AtomixClientRoutePolicyTest  extends 
AtomixClientRoutePolicyTestSupport {
+public final class AtomixClientRoutePolicyTest extends 
AtomixClientRoutePolicyTestSupport {
     @Override
     protected CamelClusterService createClusterService(String id, Address 
bootstrapNode) {
         AtomixClusterClientService service = new AtomixClusterClientService();

http://git-wip-us.apache.org/repos/asf/camel/blob/1bf1450a/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientRoutePolicyTestSupport.java
----------------------------------------------------------------------
diff --git 
a/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientRoutePolicyTestSupport.java
 
b/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientRoutePolicyTestSupport.java
index d5b0f8b..5e1ff18 100644
--- 
a/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientRoutePolicyTestSupport.java
+++ 
b/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientRoutePolicyTestSupport.java
@@ -32,7 +32,7 @@ import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.atomix.client.AtomixFactory;
 import org.apache.camel.ha.CamelClusterService;
 import org.apache.camel.impl.DefaultCamelContext;
-import org.apache.camel.impl.ha.ClusteredRoutePolicyFactory;
+import org.apache.camel.impl.ha.ClusteredRoutePolicy;
 import org.apache.camel.test.AvailablePortFinder;
 import org.junit.Assert;
 import org.junit.Test;
@@ -88,12 +88,12 @@ public abstract class AtomixClientRoutePolicyTestSupport {
             context.disableJMX();
             context.setName("context-" + id);
             context.addService(createClusterService(id, address));
-            
context.addRoutePolicyFactory(ClusteredRoutePolicyFactory.forNamespace("my-ns"));
             context.addRoutes(new RouteBuilder() {
                 @Override
                 public void configure() throws Exception {
                     from("timer:atomix?delay=1s&period=1s")
                         .routeId("route-" + id)
+                        
.routePolicy(ClusteredRoutePolicy.forNamespace("my-ns"))
                         .log("From ${routeId}")
                         .process(e -> contextLatch.countDown());
                 }

http://git-wip-us.apache.org/repos/asf/camel/blob/1bf1450a/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixEphemeralClientRoutePolicyFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixEphemeralClientRoutePolicyFactoryTest.java
 
b/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixEphemeralClientRoutePolicyFactoryTest.java
new file mode 100644
index 0000000..9bf6c75
--- /dev/null
+++ 
b/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixEphemeralClientRoutePolicyFactoryTest.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.atomix.ha;
+
+import java.util.Collections;
+
+import io.atomix.catalyst.transport.Address;
+import org.apache.camel.ha.CamelClusterService;
+
+public final class AtomixEphemeralClientRoutePolicyFactoryTest extends 
AtomixClientRoutePolicyFactoryTestSupport {
+    @Override
+    protected CamelClusterService createClusterService(String id, Address 
bootstrapNode) {
+        AtomixClusterClientService service = new AtomixClusterClientService();
+        service.setId("node-" + id);
+        service.setNodes(Collections.singletonList(bootstrapNode));
+        service.setEphemeral(true);
+
+        return service;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/1bf1450a/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixRoutePolicyFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixRoutePolicyFactoryTest.java
 
b/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixRoutePolicyFactoryTest.java
new file mode 100644
index 0000000..4643791
--- /dev/null
+++ 
b/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixRoutePolicyFactoryTest.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.atomix.ha;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+
+import io.atomix.catalyst.transport.Address;
+import io.atomix.copycat.server.storage.StorageLevel;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.impl.ha.ClusteredRoutePolicyFactory;
+import org.apache.camel.test.AvailablePortFinder;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class AtomixRoutePolicyFactoryTest {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(AtomixRoutePolicyFactoryTest.class);
+
+    private final List<Address> addresses = Arrays.asList(
+        new Address("127.0.0.1", AvailablePortFinder.getNextAvailable()),
+        new Address("127.0.0.1", AvailablePortFinder.getNextAvailable()),
+        new Address("127.0.0.1", AvailablePortFinder.getNextAvailable())
+    );
+
+    private final Set<Address> results = new HashSet<>();
+    private final ScheduledExecutorService scheduler = 
Executors.newScheduledThreadPool(addresses.size());
+    private final CountDownLatch latch = new CountDownLatch(addresses.size());
+
+    // ************************************
+    // Test
+    // ************************************
+
+    @Test
+    public void test() throws Exception {
+        for (Address address: addresses) {
+            scheduler.submit(() -> run(address));
+        }
+
+        latch.await(1, TimeUnit.MINUTES);
+        scheduler.shutdownNow();
+
+        Assert.assertEquals(addresses.size(), results.size());
+        Assert.assertTrue(results.containsAll(addresses));
+    }
+
+    // ************************************
+    // Run a Camel node
+    // ************************************
+
+    private void run(Address address) {
+        try {
+            int events = ThreadLocalRandom.current().nextInt(2, 6);
+            CountDownLatch contextLatch = new CountDownLatch(events);
+
+            AtomixClusterService service = new AtomixClusterService();
+            service.setId("node-" + address.port());
+            service.setStorageLevel(StorageLevel.MEMORY);
+            service.setAddress(address);
+            service.setNodes(addresses);
+
+            DefaultCamelContext context = new DefaultCamelContext();
+            context.disableJMX();
+            context.setName("context-" + address.port());
+            context.addService(service);
+            
context.addRoutePolicyFactory(ClusteredRoutePolicyFactory.forNamespace("my-ns"));
+            context.addRoutes(new RouteBuilder() {
+                @Override
+                public void configure() throws Exception {
+                    from("timer:atomix?delay=1s&period=1s")
+                        .routeId("route-" + address.port())
+                        .log("From ${routeId}")
+                        .process(e -> contextLatch.countDown());
+                }
+            });
+
+            // Start the context after some random time so the startup order
+            // changes for each test.
+            Thread.sleep(ThreadLocalRandom.current().nextInt(500));
+            context.start();
+
+            contextLatch.await();
+
+            LOGGER.debug("Shutting down node {}", address);
+            results.add(address);
+
+            context.stop();
+
+            latch.countDown();
+        } catch (Exception e) {
+            LOGGER.warn("", e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/1bf1450a/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixRoutePolicyTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixRoutePolicyTest.java
 
b/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixRoutePolicyTest.java
index e730291..cbb4d21 100644
--- 
a/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixRoutePolicyTest.java
+++ 
b/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixRoutePolicyTest.java
@@ -30,7 +30,7 @@ import io.atomix.catalyst.transport.Address;
 import io.atomix.copycat.server.storage.StorageLevel;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.impl.DefaultCamelContext;
-import org.apache.camel.impl.ha.ClusteredRoutePolicyFactory;
+import org.apache.camel.impl.ha.ClusteredRoutePolicy;
 import org.apache.camel.test.AvailablePortFinder;
 import org.junit.Assert;
 import org.junit.Test;
@@ -86,12 +86,12 @@ public final class AtomixRoutePolicyTest {
             context.disableJMX();
             context.setName("context-" + address.port());
             context.addService(service);
-            
context.addRoutePolicyFactory(ClusteredRoutePolicyFactory.forNamespace("my-ns"));
             context.addRoutes(new RouteBuilder() {
                 @Override
                 public void configure() throws Exception {
                     from("timer:atomix?delay=1s&period=1s")
                         .routeId("route-" + address.port())
+                        
.routePolicy(ClusteredRoutePolicy.forNamespace("my-ns"))
                         .log("From ${routeId}")
                         .process(e -> contextLatch.countDown());
                 }

http://git-wip-us.apache.org/repos/asf/camel/blob/1bf1450a/components/camel-consul/src/main/java/org/apache/camel/component/consul/ha/ConsulClusterView.java
----------------------------------------------------------------------
diff --git 
a/components/camel-consul/src/main/java/org/apache/camel/component/consul/ha/ConsulClusterView.java
 
b/components/camel-consul/src/main/java/org/apache/camel/component/consul/ha/ConsulClusterView.java
index c5971461..b0d9fea 100644
--- 
a/components/camel-consul/src/main/java/org/apache/camel/component/consul/ha/ConsulClusterView.java
+++ 
b/components/camel-consul/src/main/java/org/apache/camel/component/consul/ha/ConsulClusterView.java
@@ -298,8 +298,10 @@ final class ConsulClusterView extends 
AbstractCamelClusterView {
                     this
                 );
 
-                // Refresh session
-                sessionClient.renewSession(sessionId.get());
+                if (sessionId.get() != null) {
+                    // Refresh session
+                    sessionClient.renewSession(sessionId.get());
+                }
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/1bf1450a/components/camel-consul/src/test/java/org/apache/camel/component/consul/ha/ConsulClusteredRoutePolicyFactoryIT.java
----------------------------------------------------------------------
diff --git 
a/components/camel-consul/src/test/java/org/apache/camel/component/consul/ha/ConsulClusteredRoutePolicyFactoryIT.java
 
b/components/camel-consul/src/test/java/org/apache/camel/component/consul/ha/ConsulClusteredRoutePolicyFactoryIT.java
new file mode 100644
index 0000000..ad912f6
--- /dev/null
+++ 
b/components/camel-consul/src/test/java/org/apache/camel/component/consul/ha/ConsulClusteredRoutePolicyFactoryIT.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.consul.ha;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import com.orbitz.consul.Consul;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.impl.ha.ClusteredRoutePolicyFactory;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ConsulClusteredRoutePolicyFactoryIT {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ConsulClusteredRoutePolicyFactoryIT.class);
+    private static final List<String> CLIENTS = IntStream.range(0, 
3).mapToObj(Integer::toString).collect(Collectors.toList());
+    private static final List<String> RESULTS = new ArrayList<>();
+    private static final ScheduledExecutorService SCHEDULER = 
Executors.newScheduledThreadPool(CLIENTS.size() * 2);
+    private static final CountDownLatch LATCH = new 
CountDownLatch(CLIENTS.size());
+    private static final String CONSUL_HOST = 
System.getProperty("camel.consul.host", Consul.DEFAULT_HTTP_HOST);
+    private static final int CONSUL_PORT = 
Integer.getInteger("camel.consul.port", Consul.DEFAULT_HTTP_PORT);
+
+    // ************************************
+    // Test
+    // ************************************
+
+    @Test
+    public void test() throws Exception {
+        for (String id : CLIENTS) {
+            SCHEDULER.submit(() -> run(id));
+        }
+
+        LATCH.await(1, TimeUnit.MINUTES);
+        SCHEDULER.shutdownNow();
+
+        Assert.assertEquals(CLIENTS.size(), RESULTS.size());
+        Assert.assertTrue(RESULTS.containsAll(CLIENTS));
+    }
+
+    // ************************************
+    // Run a Camel node
+    // ************************************
+
+    private static void run(String id) {
+        try {
+            int events = ThreadLocalRandom.current().nextInt(2, 6);
+            CountDownLatch contextLatch = new CountDownLatch(events);
+
+            ConsulClusterService service = new ConsulClusterService();
+            service.setId("node-" + id);
+            service.setUrl(String.format("http://%s:%d";, CONSUL_HOST, 
CONSUL_PORT));
+
+            LOGGER.info("Consul URL {}", service.getUrl());
+
+            DefaultCamelContext context = new DefaultCamelContext();
+            context.disableJMX();
+            context.setName("context-" + id);
+            context.addService(service);
+            
context.addRoutePolicyFactory(ClusteredRoutePolicyFactory.forNamespace("my-ns"));
+            context.addRoutes(new RouteBuilder() {
+                @Override
+                public void configure() throws Exception {
+                    from("timer:consul?delay=1s&period=1s")
+                        .routeId("route-" + id)
+                        .log("From ${routeId}")
+                        .process(e -> contextLatch.countDown());
+                }
+            });
+
+            // Start the context after some random time so the startup order
+            // changes for each test.
+            Thread.sleep(ThreadLocalRandom.current().nextInt(500));
+            context.start();
+
+            contextLatch.await();
+
+            LOGGER.debug("Shutting down node {}", id);
+            RESULTS.add(id);
+
+            context.stop();
+
+            LATCH.countDown();
+        } catch (Exception e) {
+            LOGGER.warn("", e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/1bf1450a/components/camel-consul/src/test/java/org/apache/camel/component/consul/ha/ConsulClusteredRoutePolicyFactoryMain.java
----------------------------------------------------------------------
diff --git 
a/components/camel-consul/src/test/java/org/apache/camel/component/consul/ha/ConsulClusteredRoutePolicyFactoryMain.java
 
b/components/camel-consul/src/test/java/org/apache/camel/component/consul/ha/ConsulClusteredRoutePolicyFactoryMain.java
new file mode 100644
index 0000000..8179ea7
--- /dev/null
+++ 
b/components/camel-consul/src/test/java/org/apache/camel/component/consul/ha/ConsulClusteredRoutePolicyFactoryMain.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.consul.ha;
+
+import java.util.UUID;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.ExplicitCamelContextNameStrategy;
+import org.apache.camel.impl.ha.ClusteredRoutePolicyFactory;
+import org.apache.camel.main.Main;
+import org.apache.camel.main.MainListenerSupport;
+
+public final class ConsulClusteredRoutePolicyFactoryMain {
+    private ConsulClusteredRoutePolicyFactoryMain() {
+    }
+
+    public static void main(String[] args) throws Exception {
+        final String id = UUID.randomUUID().toString();
+
+        Main main = new Main();
+        main.addMainListener(new MainListenerSupport() {
+            @Override
+            public void configure(CamelContext context) {
+                try {
+                    ConsulClusterService service = new ConsulClusterService();
+                    service.setId("node-" + id);
+                    service.setUrl(args[0]);
+
+                    context.setNameStrategy(new 
ExplicitCamelContextNameStrategy("camel-" + id));
+                    context.addService(service);
+                    
context.addRoutePolicyFactory(ClusteredRoutePolicyFactory.forNamespace("my-ns"));
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        });
+
+        main.addRouteBuilder(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("timer:clustered?delay=1s&period=1s")
+                    .routeId("route-" + id)
+                    .log("Route ${routeId} is running ...");
+            }
+        });
+
+        main.run();
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/1bf1450a/components/camel-consul/src/test/java/org/apache/camel/component/consul/ha/ConsulClusteredRoutePolicyIT.java
----------------------------------------------------------------------
diff --git 
a/components/camel-consul/src/test/java/org/apache/camel/component/consul/ha/ConsulClusteredRoutePolicyIT.java
 
b/components/camel-consul/src/test/java/org/apache/camel/component/consul/ha/ConsulClusteredRoutePolicyIT.java
index 2a79682..2c003a6 100644
--- 
a/components/camel-consul/src/test/java/org/apache/camel/component/consul/ha/ConsulClusteredRoutePolicyIT.java
+++ 
b/components/camel-consul/src/test/java/org/apache/camel/component/consul/ha/ConsulClusteredRoutePolicyIT.java
@@ -29,7 +29,7 @@ import java.util.stream.IntStream;
 import com.orbitz.consul.Consul;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.impl.DefaultCamelContext;
-import org.apache.camel.impl.ha.ClusteredRoutePolicyFactory;
+import org.apache.camel.impl.ha.ClusteredRoutePolicy;
 import org.junit.Assert;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -80,12 +80,12 @@ public class ConsulClusteredRoutePolicyIT {
             context.disableJMX();
             context.setName("context-" + id);
             context.addService(service);
-            
context.addRoutePolicyFactory(ClusteredRoutePolicyFactory.forNamespace("my-ns"));
             context.addRoutes(new RouteBuilder() {
                 @Override
                 public void configure() throws Exception {
                     from("timer:consul?delay=1s&period=1s")
                         .routeId("route-" + id)
+                        
.routePolicy(ClusteredRoutePolicy.forNamespace("my-ns"))
                         .log("From ${routeId}")
                         .process(e -> contextLatch.countDown());
                 }

http://git-wip-us.apache.org/repos/asf/camel/blob/1bf1450a/components/camel-consul/src/test/java/org/apache/camel/component/consul/ha/ConsulClusteredRoutePolicyMain.java
----------------------------------------------------------------------
diff --git 
a/components/camel-consul/src/test/java/org/apache/camel/component/consul/ha/ConsulClusteredRoutePolicyMain.java
 
b/components/camel-consul/src/test/java/org/apache/camel/component/consul/ha/ConsulClusteredRoutePolicyMain.java
deleted file mode 100644
index 6f0cb00..0000000
--- 
a/components/camel-consul/src/test/java/org/apache/camel/component/consul/ha/ConsulClusteredRoutePolicyMain.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.consul.ha;
-
-import java.util.UUID;
-
-import org.apache.camel.CamelContext;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.impl.ExplicitCamelContextNameStrategy;
-import org.apache.camel.impl.ha.ClusteredRoutePolicyFactory;
-import org.apache.camel.main.Main;
-import org.apache.camel.main.MainListenerSupport;
-
-public final class ConsulClusteredRoutePolicyMain {
-    private ConsulClusteredRoutePolicyMain() {        
-    }
-
-    public static void main(String[] args) throws Exception {
-        final String id = UUID.randomUUID().toString();
-
-        Main main = new Main();
-        main.addMainListener(new MainListenerSupport() {
-            @Override
-            public void configure(CamelContext context) {
-                try {
-                    ConsulClusterService service = new ConsulClusterService();
-                    service.setId("node-" + id);
-                    service.setUrl(args[0]);
-
-                    context.setNameStrategy(new 
ExplicitCamelContextNameStrategy("camel-" + id));
-                    context.addService(service);
-                    
context.addRoutePolicyFactory(ClusteredRoutePolicyFactory.forNamespace("my-ns"));
-                } catch (Exception e) {
-                    throw new RuntimeException(e);
-                }
-            }
-        });
-
-        main.addRouteBuilder(new RouteBuilder() {
-            @Override
-            public void configure() throws Exception {
-                from("timer:clustered?delay=1s&period=1s")
-                    .routeId("route-" + id)
-                    .log("Route ${routeId} is running ...");
-            }
-        });
-
-        main.run();
-    }
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/1bf1450a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusteredRoutePolicyFactoryMain.java
----------------------------------------------------------------------
diff --git 
a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusteredRoutePolicyFactoryMain.java
 
b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusteredRoutePolicyFactoryMain.java
new file mode 100644
index 0000000..0d455dd
--- /dev/null
+++ 
b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusteredRoutePolicyFactoryMain.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeeper.ha;
+
+import java.util.UUID;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.ExplicitCamelContextNameStrategy;
+import org.apache.camel.impl.ha.ClusteredRoutePolicyFactory;
+import org.apache.camel.main.Main;
+import org.apache.camel.main.MainListenerSupport;
+
+public final class ZooKeeperClusteredRoutePolicyFactoryMain {
+    private ZooKeeperClusteredRoutePolicyFactoryMain() {
+    }
+
+    public static void main(String[] args) throws Exception {
+        final String id = UUID.randomUUID().toString();
+
+        Main main = new Main();
+        main.addMainListener(new MainListenerSupport() {
+            @Override
+            public void configure(CamelContext context) {
+                try {
+                    ZooKeeperClusterService service = new 
ZooKeeperClusterService();
+                    service.setId("node-" + id);
+                    service.setNodes(args[0]);
+                    service.setBasePath("/camel");
+
+                    context.setNameStrategy(new 
ExplicitCamelContextNameStrategy("camel-" + id));
+                    context.addService(service);
+                    
context.addRoutePolicyFactory(ClusteredRoutePolicyFactory.forNamespace("my-ns"));
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        });
+
+        main.addRouteBuilder(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("timer:clustered?delay=1s&period=1s")
+                    .routeId("route-" + id)
+                    .log("Route ${routeId} is running ...");
+            }
+        });
+
+        main.run();
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/1bf1450a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusteredRoutePolicyFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusteredRoutePolicyFactoryTest.java
 
b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusteredRoutePolicyFactoryTest.java
new file mode 100644
index 0000000..a58cc02
--- /dev/null
+++ 
b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusteredRoutePolicyFactoryTest.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeeper.ha;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.zookeeper.ZooKeeperTestSupport;
+import 
org.apache.camel.component.zookeeper.ZooKeeperTestSupport.TestZookeeperServer;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.impl.ha.ClusteredRoutePolicyFactory;
+import org.apache.camel.test.AvailablePortFinder;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class ZooKeeperClusteredRoutePolicyFactoryTest {
+    private static final int PORT = AvailablePortFinder.getNextAvailable();
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ZooKeeperClusteredRoutePolicyFactoryTest.class);
+    private static final List<String> CLIENTS = IntStream.range(0, 
3).mapToObj(Integer::toString).collect(Collectors.toList());
+    private static final List<String> RESULTS = new ArrayList<>();
+    private static final ScheduledExecutorService SCHEDULER = 
Executors.newScheduledThreadPool(CLIENTS.size() * 2);
+    private static final CountDownLatch LATCH = new 
CountDownLatch(CLIENTS.size());
+
+    // ************************************
+    // Test
+    // ************************************
+
+    @Test
+    public void test() throws Exception {
+        TestZookeeperServer server = null;
+
+        try {
+            server = new TestZookeeperServer(PORT, true);
+            ZooKeeperTestSupport.waitForServerUp("localhost:" + PORT, 1000);
+
+            for (String id : CLIENTS) {
+                SCHEDULER.submit(() -> run(id));
+            }
+
+            LATCH.await(1, TimeUnit.MINUTES);
+            SCHEDULER.shutdownNow();
+
+            Assert.assertEquals(CLIENTS.size(), RESULTS.size());
+            Assert.assertTrue(RESULTS.containsAll(CLIENTS));
+        } finally {
+            if (server != null) {
+                server.shutdown();
+            }
+        }
+    }
+
+    // ************************************
+    // Run a Camel node
+    // ************************************
+
+    private static void run(String id) {
+        try {
+            int events = ThreadLocalRandom.current().nextInt(2, 6);
+            CountDownLatch contextLatch = new CountDownLatch(events);
+
+            ZooKeeperClusterService service = new ZooKeeperClusterService();
+            service.setId("node-" + id);
+            service.setNodes("localhost:" + PORT);
+            service.setBasePath("/camel");
+
+            DefaultCamelContext context = new DefaultCamelContext();
+            context.disableJMX();
+            context.setName("context-" + id);
+            context.addService(service);
+            
context.addRoutePolicyFactory(ClusteredRoutePolicyFactory.forNamespace("my-ns"));
+            context.addRoutes(new RouteBuilder() {
+                @Override
+                public void configure() throws Exception {
+                    from("timer:zookeeper?delay=1s&period=1s")
+                        .routeId("route-" + id)
+                        .log("From ${routeId}")
+                        .process(e -> contextLatch.countDown());
+                }
+            });
+
+            // Start the context after some random time so the startup order
+            // changes for each test.
+            Thread.sleep(ThreadLocalRandom.current().nextInt(500));
+            context.start();
+
+            contextLatch.await();
+
+            LOGGER.debug("Shutting down node {}", id);
+            RESULTS.add(id);
+
+            context.stop();
+
+            LATCH.countDown();
+        } catch (Exception e) {
+            LOGGER.warn("", e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/1bf1450a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusteredRoutePolicyMain.java
----------------------------------------------------------------------
diff --git 
a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusteredRoutePolicyMain.java
 
b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusteredRoutePolicyMain.java
deleted file mode 100644
index 7b263c5..0000000
--- 
a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusteredRoutePolicyMain.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.zookeeper.ha;
-
-import java.util.UUID;
-
-import org.apache.camel.CamelContext;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.impl.ExplicitCamelContextNameStrategy;
-import org.apache.camel.impl.ha.ClusteredRoutePolicyFactory;
-import org.apache.camel.main.Main;
-import org.apache.camel.main.MainListenerSupport;
-
-public final class ZooKeeperClusteredRoutePolicyMain {
-    private ZooKeeperClusteredRoutePolicyMain() {        
-    }
-
-    public static void main(String[] args) throws Exception {
-        final String id = UUID.randomUUID().toString();
-
-        Main main = new Main();
-        main.addMainListener(new MainListenerSupport() {
-            @Override
-            public void configure(CamelContext context) {
-                try {
-                    ZooKeeperClusterService service = new 
ZooKeeperClusterService();
-                    service.setId("node-" + id);
-                    service.setNodes(args[0]);
-                    service.setBasePath("/camel");
-
-                    context.setNameStrategy(new 
ExplicitCamelContextNameStrategy("camel-" + id));
-                    context.addService(service);
-                    
context.addRoutePolicyFactory(ClusteredRoutePolicyFactory.forNamespace("my-ns"));
-                } catch (Exception e) {
-                    throw new RuntimeException(e);
-                }
-            }
-        });
-
-        main.addRouteBuilder(new RouteBuilder() {
-            @Override
-            public void configure() throws Exception {
-                from("timer:clustered?delay=1s&period=1s")
-                    .routeId("route-" + id)
-                    .log("Route ${routeId} is running ...");
-            }
-        });
-
-        main.run();
-    }
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/1bf1450a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusteredRoutePolicyTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusteredRoutePolicyTest.java
 
b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusteredRoutePolicyTest.java
index 6aa7469..6f60f06 100644
--- 
a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusteredRoutePolicyTest.java
+++ 
b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusteredRoutePolicyTest.java
@@ -30,7 +30,7 @@ import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.zookeeper.ZooKeeperTestSupport;
 import 
org.apache.camel.component.zookeeper.ZooKeeperTestSupport.TestZookeeperServer;
 import org.apache.camel.impl.DefaultCamelContext;
-import org.apache.camel.impl.ha.ClusteredRoutePolicyFactory;
+import org.apache.camel.impl.ha.ClusteredRoutePolicy;
 import org.apache.camel.test.AvailablePortFinder;
 import org.junit.Assert;
 import org.junit.Test;
@@ -91,12 +91,12 @@ public final class ZooKeeperClusteredRoutePolicyTest {
             context.disableJMX();
             context.setName("context-" + id);
             context.addService(service);
-            
context.addRoutePolicyFactory(ClusteredRoutePolicyFactory.forNamespace("my-ns"));
             context.addRoutes(new RouteBuilder() {
                 @Override
                 public void configure() throws Exception {
                     from("timer:zookeeper?delay=1s&period=1s")
                         .routeId("route-" + id)
+                        
.routePolicy(ClusteredRoutePolicy.forNamespace("my-ns"))
                         .log("From ${routeId}")
                         .process(e -> contextLatch.countDown());
                 }

Reply via email to