CAMEL-10054: Create camel-atomix component
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/49076653 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/49076653 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/49076653 Branch: refs/heads/master Commit: 490766539527f823a32cb4aaffa46005956835c7 Parents: b6f1bdd Author: lburgazzoli <lburgazz...@gmail.com> Authored: Thu Jun 1 12:27:37 2017 +0200 Committer: lburgazzoli <lburgazz...@gmail.com> Committed: Fri Jun 16 17:37:53 2017 +0200 ---------------------------------------------------------------------- .../atomix/ha/AtomixRoutePolicyMain.java | 49 +++++++++++++------- 1 file changed, 32 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/49076653/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixRoutePolicyMain.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixRoutePolicyMain.java b/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixRoutePolicyMain.java index 78ef149..0a126d3 100644 --- a/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixRoutePolicyMain.java +++ b/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixRoutePolicyMain.java @@ -36,6 +36,7 @@ import org.apache.camel.ha.CamelCluster; import org.apache.camel.ha.CamelClusterView; import org.apache.camel.impl.DefaultCamelContext; import org.apache.camel.impl.ha.ClusteredRoutePolicy; +import org.apache.camel.spi.RoutePolicy; import org.apache.camel.test.AvailablePortFinder; import org.apache.camel.util.FileUtil; import org.slf4j.Logger; @@ -44,23 +45,27 @@ import org.slf4j.LoggerFactory; public final class AtomixRoutePolicyMain { private static final Logger LOGGER = LoggerFactory.getLogger(AtomixRoutePolicyMain.class); + private static final List<Address> ADDRESSES = Arrays.asList( new Address("127.0.0.1", AvailablePortFinder.getNextAvailable()), new Address("127.0.0.1", AvailablePortFinder.getNextAvailable()), new Address("127.0.0.1", AvailablePortFinder.getNextAvailable()) ); - private static final ScheduledExecutorService EXECUTOR = Executors.newScheduledThreadPool(ADDRESSES.size() * 3); + private static final CountDownLatch LATCH = new CountDownLatch(ADDRESSES.size()); + private static final ScheduledExecutorService EXECUTOR = Executors.newScheduledThreadPool(ADDRESSES.size() * 2); public static void main(final String[] args) throws Exception { for (Address address : ADDRESSES) { - EXECUTOR.submit(() -> setupContext(address)); + EXECUTOR.submit(() -> run(address)); } - EXECUTOR.awaitTermination(5, TimeUnit.MINUTES); + LATCH.await(); + + System.exit(0); } - static void setupContext(Address address) { + static void run(Address address) { try { final String id = String.format("atomix-%d", address.port()); final File path = new File("target", id); @@ -73,7 +78,7 @@ public final class AtomixRoutePolicyMain { .withStorage( Storage.builder() .withDirectory(path) - .withStorageLevel(StorageLevel.DISK) + .withStorageLevel(StorageLevel.MEMORY) .build()) .build() .bootstrap(ADDRESSES) @@ -86,34 +91,44 @@ public final class AtomixRoutePolicyMain { view.addEventListener((e, p) -> { if (view.getLocalMember().isMaster()) { - LOGGER.info("{}, is now master", address); - try { - EXECUTOR.schedule(latch::countDown, 10, TimeUnit.SECONDS); - } catch (Exception ex) { - throw new RuntimeException(ex); - } + LOGGER.info("Member {} ({}), is now master", view.getLocalMember().getId(), address); + + // Shutdown the context later on so the next one should take + // the leadership + EXECUTOR.schedule(latch::countDown, 10, TimeUnit.SECONDS); } }); - context.disableJMX(); - context.addService(cluster, true, true); + context.addService(cluster); context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { - fromF("timer:%s?period=1s", id) - .routeId(id) - .routePolicy(new ClusteredRoutePolicy(view)) + RoutePolicy policy = ClusteredRoutePolicy.forView(view); + + fromF("timer:%s-1?period=2s", id) + .routeId(id + "-1") + .routePolicy(policy) .setHeader("ClusterMaster") .body(b -> view.getMaster().getId()) - .log("${routeId} - master is: ${header.ClusterMaster}"); + .log("${routeId} (1) - master is: ${header.ClusterMaster}"); + fromF("timer:%s-2?period=5s", id) + .routeId(id + "-2") + .routePolicy(policy) + .setHeader("ClusterMaster") + .body(b -> view.getMaster().getId()) + .log("${routeId} (2) - master is: ${header.ClusterMaster}"); } }); context.start(); latch.await(); context.stop(); + + LATCH.countDown(); } catch (Exception e) { throw new RuntimeException(e); } + + LOGGER.info("Done {}", address); } }