Improve ClusterView factory methods
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1bf1450a Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1bf1450a Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1bf1450a Branch: refs/heads/master Commit: 1bf1450a72731541522096837e98b8a51e81e68d Parents: 83f5046 Author: lburgazzoli <lburgazz...@gmail.com> Authored: Thu Sep 28 14:33:10 2017 +0200 Committer: lburgazzoli <lburgazz...@gmail.com> Committed: Thu Sep 28 15:33:18 2017 +0200 ---------------------------------------------------------------------- .../camel/impl/ha/ClusteredRouteController.java | 2 +- .../camel/impl/ha/ClusteredRoutePolicy.java | 61 +++++++--- ...FileLockClusteredRoutePolicyFactoryTest.java | 108 ++++++++++++++++ .../ha/FileLockClusteredRoutePolicyTest.java | 4 +- ...ixClientClusteredRoutePolicyFactoryMain.java | 68 +++++++++++ .../AtomixClientClusteredRoutePolicyMain.java | 68 ----------- .../ha/AtomixClientRoutePolicyFactoryTest.java | 34 ++++++ ...omixClientRoutePolicyFactoryTestSupport.java | 121 ++++++++++++++++++ .../atomix/ha/AtomixClientRoutePolicyTest.java | 2 +- .../ha/AtomixClientRoutePolicyTestSupport.java | 4 +- ...ixEphemeralClientRoutePolicyFactoryTest.java | 34 ++++++ .../atomix/ha/AtomixRoutePolicyFactoryTest.java | 117 ++++++++++++++++++ .../atomix/ha/AtomixRoutePolicyTest.java | 4 +- .../component/consul/ha/ConsulClusterView.java | 6 +- .../ha/ConsulClusteredRoutePolicyFactoryIT.java | 111 +++++++++++++++++ .../ConsulClusteredRoutePolicyFactoryMain.java | 64 ++++++++++ .../consul/ha/ConsulClusteredRoutePolicyIT.java | 4 +- .../ha/ConsulClusteredRoutePolicyMain.java | 64 ---------- ...ooKeeperClusteredRoutePolicyFactoryMain.java | 65 ++++++++++ ...ooKeeperClusteredRoutePolicyFactoryTest.java | 122 +++++++++++++++++++ .../ha/ZooKeeperClusteredRoutePolicyMain.java | 65 ---------- .../ha/ZooKeeperClusteredRoutePolicyTest.java | 4 +- 22 files changed, 901 insertions(+), 231 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/1bf1450a/camel-core/src/main/java/org/apache/camel/impl/ha/ClusteredRouteController.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/ha/ClusteredRouteController.java b/camel-core/src/main/java/org/apache/camel/impl/ha/ClusteredRouteController.java index a07ed86..137fff5 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/ha/ClusteredRouteController.java +++ b/camel-core/src/main/java/org/apache/camel/impl/ha/ClusteredRouteController.java @@ -309,7 +309,7 @@ public class ClusteredRouteController extends DefaultRouteController { final String namespace = ObjectHelper.supplyIfEmpty(configuration.getNamespace(), defaultConfiguration::getNamespace); final Duration initialDelay = ObjectHelper.supplyIfEmpty(configuration.getInitialDelay(), defaultConfiguration::getInitialDelay); - ClusteredRoutePolicy policy = ClusteredRoutePolicy.forView(clusterService.getView(namespace)); + ClusteredRoutePolicy policy = ClusteredRoutePolicy.forNamespace(clusterService, namespace); policy.setCamelContext(getCamelContext()); policy.setInitialDelay(initialDelay); http://git-wip-us.apache.org/repos/asf/camel/blob/1bf1450a/camel-core/src/main/java/org/apache/camel/impl/ha/ClusteredRoutePolicy.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/ha/ClusteredRoutePolicy.java b/camel-core/src/main/java/org/apache/camel/impl/ha/ClusteredRoutePolicy.java index 5b45cbc..b537ca7 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/ha/ClusteredRoutePolicy.java +++ b/camel-core/src/main/java/org/apache/camel/impl/ha/ClusteredRoutePolicy.java @@ -40,6 +40,7 @@ import org.apache.camel.ha.CamelClusterView; import org.apache.camel.management.event.CamelContextStartedEvent; import org.apache.camel.support.EventNotifierSupport; import org.apache.camel.support.RoutePolicySupport; +import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.ReferenceCount; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,17 +53,27 @@ public final class ClusteredRoutePolicy extends RoutePolicySupport implements Ca private final Set<Route> startedRoutes; private final Set<Route> stoppedRoutes; private final ReferenceCount refCount; - private final CamelClusterView clusterView; private final CamelClusterEventListener.Leadership leadershipEventListener; private final CamelContextStartupListener listener; private final AtomicBoolean contextStarted; + + private final String namespace; + private final CamelClusterService.Selector clusterServiceSelector; + private CamelClusterService clusterService; + private CamelClusterView clusterView; + private Duration initialDelay; private ScheduledExecutorService executorService; private CamelContext camelContext; - private ClusteredRoutePolicy(CamelClusterView clusterView) { - this.clusterView = clusterView; + private ClusteredRoutePolicy(CamelClusterService clusterService, CamelClusterService.Selector clusterServiceSelector, String namespace) { + this.namespace = namespace; + this.clusterService = clusterService; + this.clusterServiceSelector = clusterServiceSelector; + + ObjectHelper.notNull(namespace, "Namespace"); + this.leadershipEventListener = new CamelClusterLeadershipListener(); this.stoppedRoutes = new HashSet<>(); @@ -156,6 +167,24 @@ public final class ClusteredRoutePolicy extends RoutePolicySupport implements Ca } @Override + public void doStart() throws Exception { + if (clusterService == null) { + clusterService = ClusterServiceHelper.lookupService(camelContext, clusterServiceSelector).orElseThrow( + () -> new IllegalStateException("CamelCluster service not found") + ); + } + + LOGGER.debug("ClusteredRoutePolicy {} is using ClusterService instance {} (id={}, type={})", + this, + clusterService, + clusterService.getId(), + clusterService.getClass().getName() + ); + + clusterView = clusterService.getView(namespace); + } + + @Override public void doShutdown() throws Exception { this.refCount.release(); } @@ -318,11 +347,10 @@ public final class ClusteredRoutePolicy extends RoutePolicySupport implements Ca // **************************************************** public static ClusteredRoutePolicy forNamespace(CamelContext camelContext, CamelClusterService.Selector selector, String namespace) throws Exception { - final CamelClusterService service = ClusterServiceHelper.lookupService(camelContext, selector).orElseThrow( - () -> new IllegalStateException("CamelCluster service not found") - ); + ClusteredRoutePolicy policy = new ClusteredRoutePolicy(null, selector, namespace); + policy.setCamelContext(camelContext); - return forNamespace(service, namespace); + return policy; } public static ClusteredRoutePolicy forNamespace(CamelContext camelContext, String namespace) throws Exception { @@ -330,21 +358,14 @@ public final class ClusteredRoutePolicy extends RoutePolicySupport implements Ca } public static ClusteredRoutePolicy forNamespace(CamelClusterService service, String namespace) throws Exception { - return forView(service.getView(namespace)); + return new ClusteredRoutePolicy(service, ClusterServiceSelectors.DEFAULT_SELECTOR, namespace); } - public static ClusteredRoutePolicy forView(CamelClusterView view) throws Exception { - - ClusteredRoutePolicy policy = new ClusteredRoutePolicy(view); - policy.setCamelContext(view.getCamelContext()); - - LOGGER.debug("ClusteredRoutePolicy {} is using ClusterService instance {} (id={}, type={})", - policy, - view.getClusterService(), - view.getClusterService().getId(), - view.getClusterService().getClass().getName() - ); + public static ClusteredRoutePolicy forNamespace(CamelClusterService.Selector selector, String namespace) throws Exception { + return new ClusteredRoutePolicy(null, selector, namespace); + } - return policy; + public static ClusteredRoutePolicy forNamespace(String namespace) throws Exception { + return forNamespace(ClusterServiceSelectors.DEFAULT_SELECTOR, namespace); } } http://git-wip-us.apache.org/repos/asf/camel/blob/1bf1450a/camel-core/src/test/java/org/apache/camel/component/file/ha/FileLockClusteredRoutePolicyFactoryTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/component/file/ha/FileLockClusteredRoutePolicyFactoryTest.java b/camel-core/src/test/java/org/apache/camel/component/file/ha/FileLockClusteredRoutePolicyFactoryTest.java new file mode 100644 index 0000000..a93b574 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/component/file/ha/FileLockClusteredRoutePolicyFactoryTest.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.file.ha; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.impl.ha.ClusteredRoutePolicyFactory; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class FileLockClusteredRoutePolicyFactoryTest { + private static final Logger LOGGER = LoggerFactory.getLogger(FileLockClusteredRoutePolicyFactoryTest.class); + private static final List<String> CLIENTS = IntStream.range(0, 3).mapToObj(Integer::toString).collect(Collectors.toList()); + private static final List<String> RESULTS = new ArrayList<>(); + private static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(CLIENTS.size()); + private static final CountDownLatch LATCH = new CountDownLatch(CLIENTS.size()); + + // ************************************ + // Test + // ************************************ + + @Test + public void test() throws Exception { + for (String id : CLIENTS) { + SCHEDULER.submit(() -> run(id)); + } + + LATCH.await(1, TimeUnit.MINUTES); + SCHEDULER.shutdownNow(); + + Assert.assertEquals(CLIENTS.size(), RESULTS.size()); + Assert.assertTrue(RESULTS.containsAll(CLIENTS)); + } + + // ************************************ + // Run a Camel node + // ************************************ + + private static void run(String id) { + try { + int events = ThreadLocalRandom.current().nextInt(2, 6); + CountDownLatch contextLatch = new CountDownLatch(events); + + FileLockClusterService service = new FileLockClusterService(); + service.setId("node-" + id); + service.setRoot("target/ha"); + service.setAcquireLockDelay(1, TimeUnit.SECONDS); + service.setAcquireLockInterval(1, TimeUnit.SECONDS); + + DefaultCamelContext context = new DefaultCamelContext(); + context.disableJMX(); + context.setName("context-" + id); + context.addService(service); + context.addRoutePolicyFactory(ClusteredRoutePolicyFactory.forNamespace("my-ns")); + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("timer:file-lock?delay=1s&period=1s") + .routeId("route-" + id) + .log("From ${routeId}") + .process(e -> contextLatch.countDown()); + } + }); + + // Start the context after some random time so the startup order + // changes for each test. + Thread.sleep(ThreadLocalRandom.current().nextInt(500)); + context.start(); + + contextLatch.await(); + + LOGGER.debug("Shutting down node {}", id); + RESULTS.add(id); + + context.stop(); + + LATCH.countDown(); + } catch (Exception e) { + LOGGER.warn("", e); + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/1bf1450a/camel-core/src/test/java/org/apache/camel/component/file/ha/FileLockClusteredRoutePolicyTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/component/file/ha/FileLockClusteredRoutePolicyTest.java b/camel-core/src/test/java/org/apache/camel/component/file/ha/FileLockClusteredRoutePolicyTest.java index 3e19566..77fd3e2 100644 --- a/camel-core/src/test/java/org/apache/camel/component/file/ha/FileLockClusteredRoutePolicyTest.java +++ b/camel-core/src/test/java/org/apache/camel/component/file/ha/FileLockClusteredRoutePolicyTest.java @@ -28,7 +28,7 @@ import java.util.stream.IntStream; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.impl.DefaultCamelContext; -import org.apache.camel.impl.ha.ClusteredRoutePolicyFactory; +import org.apache.camel.impl.ha.ClusteredRoutePolicy; import org.junit.Assert; import org.junit.Test; import org.slf4j.Logger; @@ -77,12 +77,12 @@ public final class FileLockClusteredRoutePolicyTest { context.disableJMX(); context.setName("context-" + id); context.addService(service); - context.addRoutePolicyFactory(ClusteredRoutePolicyFactory.forNamespace("my-ns")); context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { from("timer:file-lock?delay=1s&period=1s") .routeId("route-" + id) + .routePolicy(ClusteredRoutePolicy.forNamespace("my-ns")) .log("From ${routeId}") .process(e -> contextLatch.countDown()); } http://git-wip-us.apache.org/repos/asf/camel/blob/1bf1450a/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientClusteredRoutePolicyFactoryMain.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientClusteredRoutePolicyFactoryMain.java b/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientClusteredRoutePolicyFactoryMain.java new file mode 100644 index 0000000..d009595 --- /dev/null +++ b/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientClusteredRoutePolicyFactoryMain.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atomix.ha; + +import java.util.UUID; + +import io.atomix.catalyst.transport.Address; +import io.atomix.copycat.server.storage.StorageLevel; +import org.apache.camel.CamelContext; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.impl.ExplicitCamelContextNameStrategy; +import org.apache.camel.impl.ha.ClusteredRoutePolicyFactory; +import org.apache.camel.main.Main; +import org.apache.camel.main.MainListenerSupport; + +public final class AtomixClientClusteredRoutePolicyFactoryMain { + private AtomixClientClusteredRoutePolicyFactoryMain() { + } + + public static void main(String[] args) throws Exception { + final String id = UUID.randomUUID().toString(); + + Main main = new Main(); + main.addMainListener(new MainListenerSupport() { + @Override + public void configure(CamelContext context) { + try { + AtomixClusterService service = new AtomixClusterService(); + service.setId("node-" + id); + service.setStorageLevel(StorageLevel.MEMORY); + service.setAddress(new Address(args[0])); + service.setNodes(args.length > 1 ? args[1] : args[0]); + + context.setNameStrategy(new ExplicitCamelContextNameStrategy("camel-" + id)); + context.addService(service); + context.addRoutePolicyFactory(ClusteredRoutePolicyFactory.forNamespace("my-ns")); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }); + + main.addRouteBuilder(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("timer:clustered?delay=1s&period=1s") + .routeId("route-" + id) + .log("Route ${routeId} is running ..."); + } + }); + + main.run(); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/1bf1450a/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientClusteredRoutePolicyMain.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientClusteredRoutePolicyMain.java b/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientClusteredRoutePolicyMain.java deleted file mode 100644 index ada4ab9..0000000 --- a/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientClusteredRoutePolicyMain.java +++ /dev/null @@ -1,68 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.atomix.ha; - -import java.util.UUID; - -import io.atomix.catalyst.transport.Address; -import io.atomix.copycat.server.storage.StorageLevel; -import org.apache.camel.CamelContext; -import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.impl.ExplicitCamelContextNameStrategy; -import org.apache.camel.impl.ha.ClusteredRoutePolicyFactory; -import org.apache.camel.main.Main; -import org.apache.camel.main.MainListenerSupport; - -public final class AtomixClientClusteredRoutePolicyMain { - private AtomixClientClusteredRoutePolicyMain() { - } - - public static void main(String[] args) throws Exception { - final String id = UUID.randomUUID().toString(); - - Main main = new Main(); - main.addMainListener(new MainListenerSupport() { - @Override - public void configure(CamelContext context) { - try { - AtomixClusterService service = new AtomixClusterService(); - service.setId("node-" + id); - service.setStorageLevel(StorageLevel.MEMORY); - service.setAddress(new Address(args[0])); - service.setNodes(args.length > 1 ? args[1] : args[0]); - - context.setNameStrategy(new ExplicitCamelContextNameStrategy("camel-" + id)); - context.addService(service); - context.addRoutePolicyFactory(ClusteredRoutePolicyFactory.forNamespace("my-ns")); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - }); - - main.addRouteBuilder(new RouteBuilder() { - @Override - public void configure() throws Exception { - from("timer:clustered?delay=1s&period=1s") - .routeId("route-" + id) - .log("Route ${routeId} is running ..."); - } - }); - - main.run(); - } -} http://git-wip-us.apache.org/repos/asf/camel/blob/1bf1450a/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientRoutePolicyFactoryTest.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientRoutePolicyFactoryTest.java b/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientRoutePolicyFactoryTest.java new file mode 100644 index 0000000..f29ba20 --- /dev/null +++ b/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientRoutePolicyFactoryTest.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atomix.ha; + +import java.util.Collections; + +import io.atomix.catalyst.transport.Address; +import org.apache.camel.ha.CamelClusterService; + +public final class AtomixClientRoutePolicyFactoryTest extends AtomixClientRoutePolicyFactoryTestSupport { + @Override + protected CamelClusterService createClusterService(String id, Address bootstrapNode) { + AtomixClusterClientService service = new AtomixClusterClientService(); + service.setId("node-" + id); + service.setNodes(Collections.singletonList(bootstrapNode)); + service.setEphemeral(false); + + return service; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/1bf1450a/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientRoutePolicyFactoryTestSupport.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientRoutePolicyFactoryTestSupport.java b/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientRoutePolicyFactoryTestSupport.java new file mode 100644 index 0000000..95a7cf1 --- /dev/null +++ b/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientRoutePolicyFactoryTestSupport.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atomix.ha; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import io.atomix.AtomixReplica; +import io.atomix.catalyst.transport.Address; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.atomix.client.AtomixFactory; +import org.apache.camel.ha.CamelClusterService; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.impl.ha.ClusteredRoutePolicyFactory; +import org.apache.camel.test.AvailablePortFinder; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class AtomixClientRoutePolicyFactoryTestSupport { + private static final Logger LOGGER = LoggerFactory.getLogger(AtomixClientRoutePolicyFactoryTestSupport.class); + + private final Address address = new Address("127.0.0.1", AvailablePortFinder.getNextAvailable()); + private final List<String> clients = IntStream.range(0, 3).mapToObj(Integer::toString).collect(Collectors.toList()); + private final List<String> results = new ArrayList<>(); + private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(clients.size() * 2); + private final CountDownLatch latch = new CountDownLatch(clients.size()); + + // ************************************ + // Test + // ************************************ + + @Test + public void test() throws Exception { + AtomixReplica boot = null; + + try { + boot = AtomixFactory.replica(address); + + for (String id : clients) { + scheduler.submit(() -> run(id)); + } + + latch.await(1, TimeUnit.MINUTES); + scheduler.shutdownNow(); + + Assert.assertEquals(clients.size(), results.size()); + Assert.assertTrue(results.containsAll(clients)); + } finally { + if (boot != null) { + boot.shutdown(); + } + } + } + + // ************************************ + // Run a Camel node + // ************************************ + + private void run(String id) { + try { + int events = ThreadLocalRandom.current().nextInt(2, 6); + CountDownLatch contextLatch = new CountDownLatch(events); + + DefaultCamelContext context = new DefaultCamelContext(); + context.disableJMX(); + context.setName("context-" + id); + context.addService(createClusterService(id, address)); + context.addRoutePolicyFactory(ClusteredRoutePolicyFactory.forNamespace("my-ns")); + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("timer:atomix?delay=1s&period=1s") + .routeId("route-" + id) + .log("From ${routeId}") + .process(e -> contextLatch.countDown()); + } + }); + + // Start the context after some random time so the startup order + // changes for each test. + Thread.sleep(ThreadLocalRandom.current().nextInt(500)); + context.start(); + + contextLatch.await(); + + LOGGER.debug("Shutting down client node {}", id); + results.add(id); + + context.stop(); + + latch.countDown(); + } catch (Exception e) { + LOGGER.warn("", e); + } + } + + protected abstract CamelClusterService createClusterService(String id, Address bootstrapNode); +} http://git-wip-us.apache.org/repos/asf/camel/blob/1bf1450a/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientRoutePolicyTest.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientRoutePolicyTest.java b/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientRoutePolicyTest.java index be12571..4c038ca 100644 --- a/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientRoutePolicyTest.java +++ b/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientRoutePolicyTest.java @@ -21,7 +21,7 @@ import java.util.Collections; import io.atomix.catalyst.transport.Address; import org.apache.camel.ha.CamelClusterService; -public final class AtomixClientRoutePolicyTest extends AtomixClientRoutePolicyTestSupport { +public final class AtomixClientRoutePolicyTest extends AtomixClientRoutePolicyTestSupport { @Override protected CamelClusterService createClusterService(String id, Address bootstrapNode) { AtomixClusterClientService service = new AtomixClusterClientService(); http://git-wip-us.apache.org/repos/asf/camel/blob/1bf1450a/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientRoutePolicyTestSupport.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientRoutePolicyTestSupport.java b/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientRoutePolicyTestSupport.java index d5b0f8b..5e1ff18 100644 --- a/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientRoutePolicyTestSupport.java +++ b/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientRoutePolicyTestSupport.java @@ -32,7 +32,7 @@ import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.atomix.client.AtomixFactory; import org.apache.camel.ha.CamelClusterService; import org.apache.camel.impl.DefaultCamelContext; -import org.apache.camel.impl.ha.ClusteredRoutePolicyFactory; +import org.apache.camel.impl.ha.ClusteredRoutePolicy; import org.apache.camel.test.AvailablePortFinder; import org.junit.Assert; import org.junit.Test; @@ -88,12 +88,12 @@ public abstract class AtomixClientRoutePolicyTestSupport { context.disableJMX(); context.setName("context-" + id); context.addService(createClusterService(id, address)); - context.addRoutePolicyFactory(ClusteredRoutePolicyFactory.forNamespace("my-ns")); context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { from("timer:atomix?delay=1s&period=1s") .routeId("route-" + id) + .routePolicy(ClusteredRoutePolicy.forNamespace("my-ns")) .log("From ${routeId}") .process(e -> contextLatch.countDown()); } http://git-wip-us.apache.org/repos/asf/camel/blob/1bf1450a/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixEphemeralClientRoutePolicyFactoryTest.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixEphemeralClientRoutePolicyFactoryTest.java b/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixEphemeralClientRoutePolicyFactoryTest.java new file mode 100644 index 0000000..9bf6c75 --- /dev/null +++ b/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixEphemeralClientRoutePolicyFactoryTest.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atomix.ha; + +import java.util.Collections; + +import io.atomix.catalyst.transport.Address; +import org.apache.camel.ha.CamelClusterService; + +public final class AtomixEphemeralClientRoutePolicyFactoryTest extends AtomixClientRoutePolicyFactoryTestSupport { + @Override + protected CamelClusterService createClusterService(String id, Address bootstrapNode) { + AtomixClusterClientService service = new AtomixClusterClientService(); + service.setId("node-" + id); + service.setNodes(Collections.singletonList(bootstrapNode)); + service.setEphemeral(true); + + return service; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/1bf1450a/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixRoutePolicyFactoryTest.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixRoutePolicyFactoryTest.java b/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixRoutePolicyFactoryTest.java new file mode 100644 index 0000000..4643791 --- /dev/null +++ b/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixRoutePolicyFactoryTest.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atomix.ha; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; + +import io.atomix.catalyst.transport.Address; +import io.atomix.copycat.server.storage.StorageLevel; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.impl.ha.ClusteredRoutePolicyFactory; +import org.apache.camel.test.AvailablePortFinder; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class AtomixRoutePolicyFactoryTest { + private static final Logger LOGGER = LoggerFactory.getLogger(AtomixRoutePolicyFactoryTest.class); + + private final List<Address> addresses = Arrays.asList( + new Address("127.0.0.1", AvailablePortFinder.getNextAvailable()), + new Address("127.0.0.1", AvailablePortFinder.getNextAvailable()), + new Address("127.0.0.1", AvailablePortFinder.getNextAvailable()) + ); + + private final Set<Address> results = new HashSet<>(); + private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(addresses.size()); + private final CountDownLatch latch = new CountDownLatch(addresses.size()); + + // ************************************ + // Test + // ************************************ + + @Test + public void test() throws Exception { + for (Address address: addresses) { + scheduler.submit(() -> run(address)); + } + + latch.await(1, TimeUnit.MINUTES); + scheduler.shutdownNow(); + + Assert.assertEquals(addresses.size(), results.size()); + Assert.assertTrue(results.containsAll(addresses)); + } + + // ************************************ + // Run a Camel node + // ************************************ + + private void run(Address address) { + try { + int events = ThreadLocalRandom.current().nextInt(2, 6); + CountDownLatch contextLatch = new CountDownLatch(events); + + AtomixClusterService service = new AtomixClusterService(); + service.setId("node-" + address.port()); + service.setStorageLevel(StorageLevel.MEMORY); + service.setAddress(address); + service.setNodes(addresses); + + DefaultCamelContext context = new DefaultCamelContext(); + context.disableJMX(); + context.setName("context-" + address.port()); + context.addService(service); + context.addRoutePolicyFactory(ClusteredRoutePolicyFactory.forNamespace("my-ns")); + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("timer:atomix?delay=1s&period=1s") + .routeId("route-" + address.port()) + .log("From ${routeId}") + .process(e -> contextLatch.countDown()); + } + }); + + // Start the context after some random time so the startup order + // changes for each test. + Thread.sleep(ThreadLocalRandom.current().nextInt(500)); + context.start(); + + contextLatch.await(); + + LOGGER.debug("Shutting down node {}", address); + results.add(address); + + context.stop(); + + latch.countDown(); + } catch (Exception e) { + LOGGER.warn("", e); + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/1bf1450a/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixRoutePolicyTest.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixRoutePolicyTest.java b/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixRoutePolicyTest.java index e730291..cbb4d21 100644 --- a/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixRoutePolicyTest.java +++ b/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixRoutePolicyTest.java @@ -30,7 +30,7 @@ import io.atomix.catalyst.transport.Address; import io.atomix.copycat.server.storage.StorageLevel; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.impl.DefaultCamelContext; -import org.apache.camel.impl.ha.ClusteredRoutePolicyFactory; +import org.apache.camel.impl.ha.ClusteredRoutePolicy; import org.apache.camel.test.AvailablePortFinder; import org.junit.Assert; import org.junit.Test; @@ -86,12 +86,12 @@ public final class AtomixRoutePolicyTest { context.disableJMX(); context.setName("context-" + address.port()); context.addService(service); - context.addRoutePolicyFactory(ClusteredRoutePolicyFactory.forNamespace("my-ns")); context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { from("timer:atomix?delay=1s&period=1s") .routeId("route-" + address.port()) + .routePolicy(ClusteredRoutePolicy.forNamespace("my-ns")) .log("From ${routeId}") .process(e -> contextLatch.countDown()); } http://git-wip-us.apache.org/repos/asf/camel/blob/1bf1450a/components/camel-consul/src/main/java/org/apache/camel/component/consul/ha/ConsulClusterView.java ---------------------------------------------------------------------- diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/ha/ConsulClusterView.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/ha/ConsulClusterView.java index c5971461..b0d9fea 100644 --- a/components/camel-consul/src/main/java/org/apache/camel/component/consul/ha/ConsulClusterView.java +++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/ha/ConsulClusterView.java @@ -298,8 +298,10 @@ final class ConsulClusterView extends AbstractCamelClusterView { this ); - // Refresh session - sessionClient.renewSession(sessionId.get()); + if (sessionId.get() != null) { + // Refresh session + sessionClient.renewSession(sessionId.get()); + } } } } http://git-wip-us.apache.org/repos/asf/camel/blob/1bf1450a/components/camel-consul/src/test/java/org/apache/camel/component/consul/ha/ConsulClusteredRoutePolicyFactoryIT.java ---------------------------------------------------------------------- diff --git a/components/camel-consul/src/test/java/org/apache/camel/component/consul/ha/ConsulClusteredRoutePolicyFactoryIT.java b/components/camel-consul/src/test/java/org/apache/camel/component/consul/ha/ConsulClusteredRoutePolicyFactoryIT.java new file mode 100644 index 0000000..ad912f6 --- /dev/null +++ b/components/camel-consul/src/test/java/org/apache/camel/component/consul/ha/ConsulClusteredRoutePolicyFactoryIT.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.consul.ha; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import com.orbitz.consul.Consul; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.impl.ha.ClusteredRoutePolicyFactory; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ConsulClusteredRoutePolicyFactoryIT { + private static final Logger LOGGER = LoggerFactory.getLogger(ConsulClusteredRoutePolicyFactoryIT.class); + private static final List<String> CLIENTS = IntStream.range(0, 3).mapToObj(Integer::toString).collect(Collectors.toList()); + private static final List<String> RESULTS = new ArrayList<>(); + private static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(CLIENTS.size() * 2); + private static final CountDownLatch LATCH = new CountDownLatch(CLIENTS.size()); + private static final String CONSUL_HOST = System.getProperty("camel.consul.host", Consul.DEFAULT_HTTP_HOST); + private static final int CONSUL_PORT = Integer.getInteger("camel.consul.port", Consul.DEFAULT_HTTP_PORT); + + // ************************************ + // Test + // ************************************ + + @Test + public void test() throws Exception { + for (String id : CLIENTS) { + SCHEDULER.submit(() -> run(id)); + } + + LATCH.await(1, TimeUnit.MINUTES); + SCHEDULER.shutdownNow(); + + Assert.assertEquals(CLIENTS.size(), RESULTS.size()); + Assert.assertTrue(RESULTS.containsAll(CLIENTS)); + } + + // ************************************ + // Run a Camel node + // ************************************ + + private static void run(String id) { + try { + int events = ThreadLocalRandom.current().nextInt(2, 6); + CountDownLatch contextLatch = new CountDownLatch(events); + + ConsulClusterService service = new ConsulClusterService(); + service.setId("node-" + id); + service.setUrl(String.format("http://%s:%d", CONSUL_HOST, CONSUL_PORT)); + + LOGGER.info("Consul URL {}", service.getUrl()); + + DefaultCamelContext context = new DefaultCamelContext(); + context.disableJMX(); + context.setName("context-" + id); + context.addService(service); + context.addRoutePolicyFactory(ClusteredRoutePolicyFactory.forNamespace("my-ns")); + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("timer:consul?delay=1s&period=1s") + .routeId("route-" + id) + .log("From ${routeId}") + .process(e -> contextLatch.countDown()); + } + }); + + // Start the context after some random time so the startup order + // changes for each test. + Thread.sleep(ThreadLocalRandom.current().nextInt(500)); + context.start(); + + contextLatch.await(); + + LOGGER.debug("Shutting down node {}", id); + RESULTS.add(id); + + context.stop(); + + LATCH.countDown(); + } catch (Exception e) { + LOGGER.warn("", e); + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/1bf1450a/components/camel-consul/src/test/java/org/apache/camel/component/consul/ha/ConsulClusteredRoutePolicyFactoryMain.java ---------------------------------------------------------------------- diff --git a/components/camel-consul/src/test/java/org/apache/camel/component/consul/ha/ConsulClusteredRoutePolicyFactoryMain.java b/components/camel-consul/src/test/java/org/apache/camel/component/consul/ha/ConsulClusteredRoutePolicyFactoryMain.java new file mode 100644 index 0000000..8179ea7 --- /dev/null +++ b/components/camel-consul/src/test/java/org/apache/camel/component/consul/ha/ConsulClusteredRoutePolicyFactoryMain.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.consul.ha; + +import java.util.UUID; + +import org.apache.camel.CamelContext; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.impl.ExplicitCamelContextNameStrategy; +import org.apache.camel.impl.ha.ClusteredRoutePolicyFactory; +import org.apache.camel.main.Main; +import org.apache.camel.main.MainListenerSupport; + +public final class ConsulClusteredRoutePolicyFactoryMain { + private ConsulClusteredRoutePolicyFactoryMain() { + } + + public static void main(String[] args) throws Exception { + final String id = UUID.randomUUID().toString(); + + Main main = new Main(); + main.addMainListener(new MainListenerSupport() { + @Override + public void configure(CamelContext context) { + try { + ConsulClusterService service = new ConsulClusterService(); + service.setId("node-" + id); + service.setUrl(args[0]); + + context.setNameStrategy(new ExplicitCamelContextNameStrategy("camel-" + id)); + context.addService(service); + context.addRoutePolicyFactory(ClusteredRoutePolicyFactory.forNamespace("my-ns")); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }); + + main.addRouteBuilder(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("timer:clustered?delay=1s&period=1s") + .routeId("route-" + id) + .log("Route ${routeId} is running ..."); + } + }); + + main.run(); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/1bf1450a/components/camel-consul/src/test/java/org/apache/camel/component/consul/ha/ConsulClusteredRoutePolicyIT.java ---------------------------------------------------------------------- diff --git a/components/camel-consul/src/test/java/org/apache/camel/component/consul/ha/ConsulClusteredRoutePolicyIT.java b/components/camel-consul/src/test/java/org/apache/camel/component/consul/ha/ConsulClusteredRoutePolicyIT.java index 2a79682..2c003a6 100644 --- a/components/camel-consul/src/test/java/org/apache/camel/component/consul/ha/ConsulClusteredRoutePolicyIT.java +++ b/components/camel-consul/src/test/java/org/apache/camel/component/consul/ha/ConsulClusteredRoutePolicyIT.java @@ -29,7 +29,7 @@ import java.util.stream.IntStream; import com.orbitz.consul.Consul; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.impl.DefaultCamelContext; -import org.apache.camel.impl.ha.ClusteredRoutePolicyFactory; +import org.apache.camel.impl.ha.ClusteredRoutePolicy; import org.junit.Assert; import org.junit.Test; import org.slf4j.Logger; @@ -80,12 +80,12 @@ public class ConsulClusteredRoutePolicyIT { context.disableJMX(); context.setName("context-" + id); context.addService(service); - context.addRoutePolicyFactory(ClusteredRoutePolicyFactory.forNamespace("my-ns")); context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { from("timer:consul?delay=1s&period=1s") .routeId("route-" + id) + .routePolicy(ClusteredRoutePolicy.forNamespace("my-ns")) .log("From ${routeId}") .process(e -> contextLatch.countDown()); } http://git-wip-us.apache.org/repos/asf/camel/blob/1bf1450a/components/camel-consul/src/test/java/org/apache/camel/component/consul/ha/ConsulClusteredRoutePolicyMain.java ---------------------------------------------------------------------- diff --git a/components/camel-consul/src/test/java/org/apache/camel/component/consul/ha/ConsulClusteredRoutePolicyMain.java b/components/camel-consul/src/test/java/org/apache/camel/component/consul/ha/ConsulClusteredRoutePolicyMain.java deleted file mode 100644 index 6f0cb00..0000000 --- a/components/camel-consul/src/test/java/org/apache/camel/component/consul/ha/ConsulClusteredRoutePolicyMain.java +++ /dev/null @@ -1,64 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.consul.ha; - -import java.util.UUID; - -import org.apache.camel.CamelContext; -import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.impl.ExplicitCamelContextNameStrategy; -import org.apache.camel.impl.ha.ClusteredRoutePolicyFactory; -import org.apache.camel.main.Main; -import org.apache.camel.main.MainListenerSupport; - -public final class ConsulClusteredRoutePolicyMain { - private ConsulClusteredRoutePolicyMain() { - } - - public static void main(String[] args) throws Exception { - final String id = UUID.randomUUID().toString(); - - Main main = new Main(); - main.addMainListener(new MainListenerSupport() { - @Override - public void configure(CamelContext context) { - try { - ConsulClusterService service = new ConsulClusterService(); - service.setId("node-" + id); - service.setUrl(args[0]); - - context.setNameStrategy(new ExplicitCamelContextNameStrategy("camel-" + id)); - context.addService(service); - context.addRoutePolicyFactory(ClusteredRoutePolicyFactory.forNamespace("my-ns")); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - }); - - main.addRouteBuilder(new RouteBuilder() { - @Override - public void configure() throws Exception { - from("timer:clustered?delay=1s&period=1s") - .routeId("route-" + id) - .log("Route ${routeId} is running ..."); - } - }); - - main.run(); - } -} http://git-wip-us.apache.org/repos/asf/camel/blob/1bf1450a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusteredRoutePolicyFactoryMain.java ---------------------------------------------------------------------- diff --git a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusteredRoutePolicyFactoryMain.java b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusteredRoutePolicyFactoryMain.java new file mode 100644 index 0000000..0d455dd --- /dev/null +++ b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusteredRoutePolicyFactoryMain.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.zookeeper.ha; + +import java.util.UUID; + +import org.apache.camel.CamelContext; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.impl.ExplicitCamelContextNameStrategy; +import org.apache.camel.impl.ha.ClusteredRoutePolicyFactory; +import org.apache.camel.main.Main; +import org.apache.camel.main.MainListenerSupport; + +public final class ZooKeeperClusteredRoutePolicyFactoryMain { + private ZooKeeperClusteredRoutePolicyFactoryMain() { + } + + public static void main(String[] args) throws Exception { + final String id = UUID.randomUUID().toString(); + + Main main = new Main(); + main.addMainListener(new MainListenerSupport() { + @Override + public void configure(CamelContext context) { + try { + ZooKeeperClusterService service = new ZooKeeperClusterService(); + service.setId("node-" + id); + service.setNodes(args[0]); + service.setBasePath("/camel"); + + context.setNameStrategy(new ExplicitCamelContextNameStrategy("camel-" + id)); + context.addService(service); + context.addRoutePolicyFactory(ClusteredRoutePolicyFactory.forNamespace("my-ns")); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }); + + main.addRouteBuilder(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("timer:clustered?delay=1s&period=1s") + .routeId("route-" + id) + .log("Route ${routeId} is running ..."); + } + }); + + main.run(); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/1bf1450a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusteredRoutePolicyFactoryTest.java ---------------------------------------------------------------------- diff --git a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusteredRoutePolicyFactoryTest.java b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusteredRoutePolicyFactoryTest.java new file mode 100644 index 0000000..a58cc02 --- /dev/null +++ b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusteredRoutePolicyFactoryTest.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.zookeeper.ha; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.zookeeper.ZooKeeperTestSupport; +import org.apache.camel.component.zookeeper.ZooKeeperTestSupport.TestZookeeperServer; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.impl.ha.ClusteredRoutePolicyFactory; +import org.apache.camel.test.AvailablePortFinder; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class ZooKeeperClusteredRoutePolicyFactoryTest { + private static final int PORT = AvailablePortFinder.getNextAvailable(); + private static final Logger LOGGER = LoggerFactory.getLogger(ZooKeeperClusteredRoutePolicyFactoryTest.class); + private static final List<String> CLIENTS = IntStream.range(0, 3).mapToObj(Integer::toString).collect(Collectors.toList()); + private static final List<String> RESULTS = new ArrayList<>(); + private static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(CLIENTS.size() * 2); + private static final CountDownLatch LATCH = new CountDownLatch(CLIENTS.size()); + + // ************************************ + // Test + // ************************************ + + @Test + public void test() throws Exception { + TestZookeeperServer server = null; + + try { + server = new TestZookeeperServer(PORT, true); + ZooKeeperTestSupport.waitForServerUp("localhost:" + PORT, 1000); + + for (String id : CLIENTS) { + SCHEDULER.submit(() -> run(id)); + } + + LATCH.await(1, TimeUnit.MINUTES); + SCHEDULER.shutdownNow(); + + Assert.assertEquals(CLIENTS.size(), RESULTS.size()); + Assert.assertTrue(RESULTS.containsAll(CLIENTS)); + } finally { + if (server != null) { + server.shutdown(); + } + } + } + + // ************************************ + // Run a Camel node + // ************************************ + + private static void run(String id) { + try { + int events = ThreadLocalRandom.current().nextInt(2, 6); + CountDownLatch contextLatch = new CountDownLatch(events); + + ZooKeeperClusterService service = new ZooKeeperClusterService(); + service.setId("node-" + id); + service.setNodes("localhost:" + PORT); + service.setBasePath("/camel"); + + DefaultCamelContext context = new DefaultCamelContext(); + context.disableJMX(); + context.setName("context-" + id); + context.addService(service); + context.addRoutePolicyFactory(ClusteredRoutePolicyFactory.forNamespace("my-ns")); + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("timer:zookeeper?delay=1s&period=1s") + .routeId("route-" + id) + .log("From ${routeId}") + .process(e -> contextLatch.countDown()); + } + }); + + // Start the context after some random time so the startup order + // changes for each test. + Thread.sleep(ThreadLocalRandom.current().nextInt(500)); + context.start(); + + contextLatch.await(); + + LOGGER.debug("Shutting down node {}", id); + RESULTS.add(id); + + context.stop(); + + LATCH.countDown(); + } catch (Exception e) { + LOGGER.warn("", e); + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/1bf1450a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusteredRoutePolicyMain.java ---------------------------------------------------------------------- diff --git a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusteredRoutePolicyMain.java b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusteredRoutePolicyMain.java deleted file mode 100644 index 7b263c5..0000000 --- a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusteredRoutePolicyMain.java +++ /dev/null @@ -1,65 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.zookeeper.ha; - -import java.util.UUID; - -import org.apache.camel.CamelContext; -import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.impl.ExplicitCamelContextNameStrategy; -import org.apache.camel.impl.ha.ClusteredRoutePolicyFactory; -import org.apache.camel.main.Main; -import org.apache.camel.main.MainListenerSupport; - -public final class ZooKeeperClusteredRoutePolicyMain { - private ZooKeeperClusteredRoutePolicyMain() { - } - - public static void main(String[] args) throws Exception { - final String id = UUID.randomUUID().toString(); - - Main main = new Main(); - main.addMainListener(new MainListenerSupport() { - @Override - public void configure(CamelContext context) { - try { - ZooKeeperClusterService service = new ZooKeeperClusterService(); - service.setId("node-" + id); - service.setNodes(args[0]); - service.setBasePath("/camel"); - - context.setNameStrategy(new ExplicitCamelContextNameStrategy("camel-" + id)); - context.addService(service); - context.addRoutePolicyFactory(ClusteredRoutePolicyFactory.forNamespace("my-ns")); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - }); - - main.addRouteBuilder(new RouteBuilder() { - @Override - public void configure() throws Exception { - from("timer:clustered?delay=1s&period=1s") - .routeId("route-" + id) - .log("Route ${routeId} is running ..."); - } - }); - - main.run(); - } -} http://git-wip-us.apache.org/repos/asf/camel/blob/1bf1450a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusteredRoutePolicyTest.java ---------------------------------------------------------------------- diff --git a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusteredRoutePolicyTest.java b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusteredRoutePolicyTest.java index 6aa7469..6f60f06 100644 --- a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusteredRoutePolicyTest.java +++ b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusteredRoutePolicyTest.java @@ -30,7 +30,7 @@ import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.zookeeper.ZooKeeperTestSupport; import org.apache.camel.component.zookeeper.ZooKeeperTestSupport.TestZookeeperServer; import org.apache.camel.impl.DefaultCamelContext; -import org.apache.camel.impl.ha.ClusteredRoutePolicyFactory; +import org.apache.camel.impl.ha.ClusteredRoutePolicy; import org.apache.camel.test.AvailablePortFinder; import org.junit.Assert; import org.junit.Test; @@ -91,12 +91,12 @@ public final class ZooKeeperClusteredRoutePolicyTest { context.disableJMX(); context.setName("context-" + id); context.addService(service); - context.addRoutePolicyFactory(ClusteredRoutePolicyFactory.forNamespace("my-ns")); context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { from("timer:zookeeper?delay=1s&period=1s") .routeId("route-" + id) + .routePolicy(ClusteredRoutePolicy.forNamespace("my-ns")) .log("From ${routeId}") .process(e -> contextLatch.countDown()); }