CAMEL-10320: Camel Master component for clustered services
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1c8bca8e Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1c8bca8e Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1c8bca8e Branch: refs/heads/master Commit: 1c8bca8e842bc2fd85bd865f8c6968ddda8329d6 Parents: 14f2391 Author: lburgazzoli <lburgazz...@gmail.com> Authored: Mon Sep 25 17:05:25 2017 +0200 Committer: lburgazzoli <lburgazz...@gmail.com> Committed: Wed Sep 27 16:17:01 2017 +0200 ---------------------------------------------------------------------- bom/camel-bom/pom.xml | 10 ++ components/camel-consul/pom.xml | 5 + .../component/consul/ha/ConsulMasterIT.java | 109 +++++++++++++ .../src/test/resources/log4j2.properties | 2 + components/camel-master/pom.xml | 9 +- .../src/main/docs/master-component.adoc | 122 ++++++++------ .../camel/component/master/MasterComponent.java | 59 ++++++- .../camel/component/master/MasterConsumer.java | 34 ++-- .../camel/component/master/MasterEndpoint.java | 21 ++- .../component/master/MasterComponentTest.java | 40 +++-- .../master/util/InMemoryClusterMember.java | 49 ------ .../master/util/InMemoryClusterService.java | 47 ------ .../master/util/InMemoryClusterView.java | 159 ------------------- .../src/test/resources/jgroups-tcp.xml | 53 ------- .../src/test/resources/log4j2.properties | 5 +- ...ooKeeperClusteredRouteConfigurationTest.java | 26 +++ .../zookeeper/ha/ZooKeeperMasterMain.java | 12 +- .../zookeeper/ha/ZooKeeperMasterTest.java | 20 +-- ...ZooKeeperClusteredRouteConfigurationTest.xml | 42 +++++ .../MasterComponentConfiguration.java | 33 +++- .../camel-spring-boot-dependencies/pom.xml | 10 ++ 21 files changed, 443 insertions(+), 424 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/1c8bca8e/bom/camel-bom/pom.xml ---------------------------------------------------------------------- diff --git a/bom/camel-bom/pom.xml b/bom/camel-bom/pom.xml index 4e9a433..175fd27 100644 --- a/bom/camel-bom/pom.xml +++ b/bom/camel-bom/pom.xml @@ -1576,6 +1576,16 @@ </dependency> <dependency> <groupId>org.apache.camel</groupId> + <artifactId>camel-master</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-master-starter</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.camel</groupId> <artifactId>camel-metrics</artifactId> <version>${project.version}</version> </dependency> http://git-wip-us.apache.org/repos/asf/camel/blob/1c8bca8e/components/camel-consul/pom.xml ---------------------------------------------------------------------- diff --git a/components/camel-consul/pom.xml b/components/camel-consul/pom.xml index 8133b47..cbe3a26 100644 --- a/components/camel-consul/pom.xml +++ b/components/camel-consul/pom.xml @@ -71,6 +71,11 @@ </dependency> <dependency> <groupId>org.apache.camel</groupId> + <artifactId>camel-master</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.camel</groupId> <artifactId>camel-ribbon</artifactId> <scope>test</scope> </dependency> http://git-wip-us.apache.org/repos/asf/camel/blob/1c8bca8e/components/camel-consul/src/test/java/org/apache/camel/component/consul/ha/ConsulMasterIT.java ---------------------------------------------------------------------- diff --git a/components/camel-consul/src/test/java/org/apache/camel/component/consul/ha/ConsulMasterIT.java b/components/camel-consul/src/test/java/org/apache/camel/component/consul/ha/ConsulMasterIT.java new file mode 100644 index 0000000..5dbbb5c --- /dev/null +++ b/components/camel-consul/src/test/java/org/apache/camel/component/consul/ha/ConsulMasterIT.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.consul.ha; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import com.orbitz.consul.Consul; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.impl.DefaultCamelContext; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ConsulMasterIT { + private static final Logger LOGGER = LoggerFactory.getLogger(ConsulMasterIT.class); + private static final List<String> CLIENTS = IntStream.range(0, 3).mapToObj(Integer::toString).collect(Collectors.toList()); + private static final List<String> RESULTS = new ArrayList<>(); + private static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(CLIENTS.size() * 2); + private static final CountDownLatch LATCH = new CountDownLatch(CLIENTS.size()); + private static final String CONSUL_HOST = System.getProperty("camel.consul.host", Consul.DEFAULT_HTTP_HOST); + private static final int CONSUL_PORT = Integer.getInteger("camel.consul.port", Consul.DEFAULT_HTTP_PORT); + + // ************************************ + // Test + // ************************************ + + @Test + public void test() throws Exception { + for (String id : CLIENTS) { + SCHEDULER.submit(() -> run(id)); + } + + LATCH.await(1, TimeUnit.MINUTES); + SCHEDULER.shutdownNow(); + + Assert.assertEquals(CLIENTS.size(), RESULTS.size()); + Assert.assertTrue(RESULTS.containsAll(CLIENTS)); + } + + // ************************************ + // Run a Camel node + // ************************************ + + private static void run(String id) { + try { + int events = ThreadLocalRandom.current().nextInt(2, 6); + CountDownLatch contextLatch = new CountDownLatch(events); + + ConsulClusterService service = new ConsulClusterService(); + service.setId("node-" + id); + service.setUrl(String.format("http://%s:%d", CONSUL_HOST, CONSUL_PORT)); + + LOGGER.info("Consul URL {}", service.getUrl()); + + DefaultCamelContext context = new DefaultCamelContext(); + context.disableJMX(); + context.setName("context-" + id); + context.addService(service); + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("master:my-ns:timer:consul?delay=1s&period=1s") + .routeId("route-" + id) + .log("From ${routeId}") + .process(e -> contextLatch.countDown()); + } + }); + + // Start the context after some random time so the startup order + // changes for each test. + Thread.sleep(ThreadLocalRandom.current().nextInt(500)); + context.start(); + + contextLatch.await(); + + LOGGER.debug("Shutting down node {}", id); + RESULTS.add(id); + + context.stop(); + + LATCH.countDown(); + } catch (Exception e) { + LOGGER.warn("", e); + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/1c8bca8e/components/camel-consul/src/test/resources/log4j2.properties ---------------------------------------------------------------------- diff --git a/components/camel-consul/src/test/resources/log4j2.properties b/components/camel-consul/src/test/resources/log4j2.properties index a4da2bf..059cf25 100644 --- a/components/camel-consul/src/test/resources/log4j2.properties +++ b/components/camel-consul/src/test/resources/log4j2.properties @@ -40,6 +40,8 @@ logger.camel-ha.name = org.apache.camel.ha logger.camel-ha.level = DEBUG logger.camel-impl-ha.name = org.apache.camel.impl.ha logger.camel-impl-ha.level = DEBUG +logger.camel-master.name = org.apache.camel.component.master +logger.camel-master.level = DEBUG rootLogger.level = INFO #rootLogger.appenderRef.stdout.ref = out http://git-wip-us.apache.org/repos/asf/camel/blob/1c8bca8e/components/camel-master/pom.xml ---------------------------------------------------------------------- diff --git a/components/camel-master/pom.xml b/components/camel-master/pom.xml index 58390b4..f103c8e 100644 --- a/components/camel-master/pom.xml +++ b/components/camel-master/pom.xml @@ -34,8 +34,7 @@ <properties> <camel.osgi.import> - !com.google.common.base;, - !org.apache.camel.component.master.group, + !org.apache.camel.component.master, * </camel.osgi.import> <camel.osgi.export.pkg> @@ -57,12 +56,6 @@ <artifactId>camel-test</artifactId> <scope>test</scope> </dependency> - <dependency> - <groupId>org.jgroups</groupId> - <artifactId>jgroups</artifactId> - <version>${jgroups-version}</version> - <scope>test</scope> - </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> http://git-wip-us.apache.org/repos/asf/camel/blob/1c8bca8e/components/camel-master/src/main/docs/master-component.adoc ---------------------------------------------------------------------- diff --git a/components/camel-master/src/main/docs/master-component.adoc b/components/camel-master/src/main/docs/master-component.adoc index bce8b37..a5969bd 100644 --- a/components/camel-master/src/main/docs/master-component.adoc +++ b/components/camel-master/src/main/docs/master-component.adoc @@ -2,7 +2,7 @@ *Available as of Camel version 2.20* -The **zookeeper-master:** endpoint provides a way to ensure only a single consumer in a cluster consumes from a given endpoint; +The **camel-master:** endpoint provides a way to ensure only a single consumer in a cluster consumes from a given endpoint; with automatic failover if that JVM dies. This can be very useful if you need to consume from some legacy back end which either doesn't support concurrent @@ -10,11 +10,11 @@ consumption or due to commercial or stability reasons you can only have a single ### Using the master endpoint -Just prefix any camel endpoint with **zookeeper-master:someName:** where _someName_ is a logical name and is +Just prefix any camel endpoint with **master:someName:** where _someName_ is a logical name and is used to acquire the master lock. e.g. ``` -from("zookeeper-master:cheese:jms:foo").to("activemq:wine"); +from("master:cheese:jms:foo").to("activemq:wine"); ``` The above simulates the [Exclusive Consumers](http://activemq.apache.org/exclusive-consumer.html) type feature in ActiveMQ; but on any third party JMS provider which maybe doesn't support exclusive consumers. @@ -24,7 +24,7 @@ ActiveMQ; but on any third party JMS provider which maybe doesn't support exclus [source] ---- -zookeeper-master:name:endpoint[?options] +master:namespace:endpoint[?options] ---- Where endpoint is any Camel endpoint you want to run in master/slave mode. @@ -33,7 +33,17 @@ Where endpoint is any Camel endpoint you want to run in master/slave mode. ### Options // component options: START -The Master component has no options. +The Master component supports 3 options which are listed below. + + + +[width="100%",cols="2,5,^1,2",options="header"] +|=== +| Name | Description | Default | Type +| *service* (advanced) | Inject the service to use. | | CamelClusterService +| *serviceSelector* (advanced) | Inject the service selector used to lookup the CamelClusterService to use. | | Selector +| *resolveProperty Placeholders* (advanced) | Whether the component should resolve property placeholders on itself when starting. Only properties which are of String type can use property placeholders. | true | boolean +|=== // component options: END // endpoint options: START @@ -73,59 +83,77 @@ You can protect a clustered Camel application to only consume files from one act [source,java] ---- - // the file endpoint we want to consume from - String url = "file:target/inbox?delete=true"; - - // use the zookeeper master component in the clustered group named myGroup - // to run a master/slave mode in the following Camel url - from("zookeeper-master:myGroup:" + url) - .log(name + " - Received file: ${file:name}") - .delay(delay) - .log(name + " - Done file: ${file:name}") - .to("file:target/outbox"); +// the file endpoint we want to consume from +String url = "file:target/inbox?delete=true"; + +// use the camel master component in the clustered group named myGroup +// to run a master/slave mode in the following Camel url +from("master:myGroup:" + url) + .log(name + " - Received file: ${file:name}") + .delay(delay) + .log(name + " - Done file: ${file:name}") + .to("file:target/outbox"); ---- -ZooKeeper will by default connect to `localhost:2181`, but you can configure this on the component level. +The master component leverages CamelClusterService you can configure using +* *Java* ++ [source,java] ---- - MasterComponent master = new MasterComponent(); - master.setZooKeeperUrl("myzookeeper:2181"); ----- - -However you can also configure the url of the ZooKeeper ensemble using environment variables. +ZooKeeperClusterService service = new ZooKeeperClusterService(); +service.setId("camel-node-1"); +service.setNodes("myzk:2181"); +service.setBasePath("/camel/ha"); - export ZOOKEEPER_URL = "myzookeeper:2181" - -## Master RoutePolicy +context.addService(service) +---- -You can also use a `RoutePolicy` to control routes in master/slave mode. +* *Xml (Spring/Blueprint)* ++ +[source,xml] +---- +<beans xmlns="http://www.springframework.org/schema/beans" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation=" + http://www.springframework.org/schema/beans + http://www.springframework.org/schema/beans/spring-beans.xsd + http://camel.apache.org/schema/spring + http://camel.apache.org/schema/spring/camel-spring.xsd"> + + + <bean id="ha" class="org.apache.camel.component.zookeeper.ha.ZooKeeperClusterService"> + <property name="id" value="camel-node-1"/> + <property name="basePath" value="/camel/ha"/> + <property name="nodes" value="myzk:2181"/> + </bean> + + <camelContext xmlns="http://camel.apache.org/schema/spring" autoStartup="false"> + ... + </camelContext> + +</beans> +---- -When doing so you must configure the route policy with +* *Spring boot* ++ +[source,properties] +---- +camel.component.zookeeper.cluster.service.enabled = true +camel.component.zookeeper.cluster.service.id = camel-node-1 +camel.component.zookeeper.cluster.service.base-path = /camel/ha +camel.component.zookeeper.cluster.service.nodes = myzk:2181 +---- -- url to zookeeper ensemble -- name of cluster group -- *important* and set the route to not auto startup +### Implementations -A little example +Camel provide the following ClusterService implementations: -[source,java] ----- - MasterRoutePolicy master = new MasterRoutePolicy(); - master.setZooKeeperUrl("localhost:2181"); - master.setGroupName("myGroup"); - - // its import to set the route to not auto startup - // as we let the route policy start/stop the routes when it becomes a master/slave etc - from("file:target/inbox?delete=true").noAutoStartup() - // use the zookeeper master route policy in the clustered group - // to run this route in master/slave mode - .routePolicy(master) - .log(name + " - Received file: ${file:name}") - .delay(delay) - .log(name + " - Done file: ${file:name}") - .to("file:target/outbox"); ----- +- camel-atomix +- camel-consul +- camel-file +- camel-kubernetes +- camel-zookeeper ### See Also http://git-wip-us.apache.org/repos/asf/camel/blob/1c8bca8e/components/camel-master/src/main/java/org/apache/camel/component/master/MasterComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-master/src/main/java/org/apache/camel/component/master/MasterComponent.java b/components/camel-master/src/main/java/org/apache/camel/component/master/MasterComponent.java index 3c10d9d..6d2b3dc 100644 --- a/components/camel-master/src/main/java/org/apache/camel/component/master/MasterComponent.java +++ b/components/camel-master/src/main/java/org/apache/camel/component/master/MasterComponent.java @@ -20,7 +20,11 @@ import java.util.Map; import org.apache.camel.CamelContext; import org.apache.camel.Endpoint; +import org.apache.camel.ha.CamelClusterService; import org.apache.camel.impl.DefaultComponent; +import org.apache.camel.impl.ha.ClusterServiceHelper; +import org.apache.camel.impl.ha.ClusterServiceSelectors; +import org.apache.camel.spi.Metadata; import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.StringHelper; @@ -31,6 +35,10 @@ import org.apache.camel.util.StringHelper; * a single consumer. */ public class MasterComponent extends DefaultComponent { + @Metadata(label = "advanced") + private CamelClusterService service; + @Metadata(label = "advanced") + private CamelClusterService.Selector serviceSelector; public MasterComponent() { this(null); @@ -38,6 +46,8 @@ public class MasterComponent extends DefaultComponent { public MasterComponent(CamelContext context) { super(context); + + this.serviceSelector = ClusterServiceSelectors.DEFAULT_SELECTOR; } @Override @@ -55,6 +65,53 @@ public class MasterComponent extends DefaultComponent { delegateUri = delegateUri + "?" + uri.substring(uri.indexOf('?') + 1); } - return new MasterEndpoint(uri, this, namespace, delegateUri); + return new MasterEndpoint( + uri, + this, + getClusterService(), + namespace, + delegateUri + ); + } + + public CamelClusterService getService() { + return service; + } + + /** + * Inject the service to use. + */ + public void setService(CamelClusterService service) { + this.service = service; + } + + public CamelClusterService.Selector getServiceSelector() { + return serviceSelector; + } + + /** + * + * Inject the service selector used to lookup the {@link CamelClusterService} to use. + */ + public void setServiceSelector(CamelClusterService.Selector serviceSelector) { + this.serviceSelector = serviceSelector; + } + + // ******************************** + // Helpers + // ******************************** + + private CamelClusterService getClusterService() throws Exception { + if (service == null) { + CamelContext context = getCamelContext(); + + ObjectHelper.notNull(context, "Camel Context"); + + service = ClusterServiceHelper.lookupService(context, serviceSelector).orElseThrow( + () -> new IllegalStateException("No cluster service found") + ); + } + + return service; } } http://git-wip-us.apache.org/repos/asf/camel/blob/1c8bca8e/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java b/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java index b693464..999ac71 100644 --- a/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java +++ b/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java @@ -16,7 +16,8 @@ */ package org.apache.camel.component.master; -import org.apache.camel.CamelContext; +import java.util.Optional; + import org.apache.camel.Consumer; import org.apache.camel.Endpoint; import org.apache.camel.Processor; @@ -36,19 +37,20 @@ import org.slf4j.LoggerFactory; @ManagedResource(description = "Managed Master Consumer") public class MasterConsumer extends DefaultConsumer { - private static final transient Logger LOGER = LoggerFactory.getLogger(MasterConsumer.class); + private static final transient Logger LOGGER = LoggerFactory.getLogger(MasterConsumer.class); + private final CamelClusterService clusterService; private final MasterEndpoint masterEndpoint; private final Endpoint delegatedEndpoint; private final Processor processor; private final CamelClusterEventListener.Leadership leadershipListener; private Consumer delegatedConsumer; - private CamelClusterService service; - private CamelClusterView view; + private volatile CamelClusterView view; - public MasterConsumer(MasterEndpoint masterEndpoint, Processor processor) { + public MasterConsumer(MasterEndpoint masterEndpoint, Processor processor, CamelClusterService clusterService) { super(masterEndpoint, processor); + this.clusterService = clusterService; this.masterEndpoint = masterEndpoint; this.delegatedEndpoint = masterEndpoint.getEndpoint(); this.processor = processor; @@ -59,14 +61,9 @@ public class MasterConsumer extends DefaultConsumer { protected void doStart() throws Exception { super.doStart(); - CamelContext context = super.getEndpoint().getCamelContext(); - service = context.hasService(CamelClusterService.class); - - if (service == null) { - throw new IllegalStateException("No cluster service found"); - } + LOGGER.debug("Using ClusterService instance {} (id={}, type={})", clusterService, clusterService.getId(), clusterService.getClass().getName()); - view = service.getView(masterEndpoint.getNamespace()); + view = clusterService.getView(masterEndpoint.getNamespace()); view.addEventListener(leadershipListener); if (isMaster()) { @@ -80,6 +77,9 @@ public class MasterConsumer extends DefaultConsumer { if (view != null) { view.removeEventListener(leadershipListener); + clusterService.releaseView(view); + + view = null; } ServiceHelper.stopAndShutdownServices(delegatedConsumer); @@ -107,7 +107,7 @@ public class MasterConsumer extends DefaultConsumer { @ManagedAttribute(description = "Are we the master") public boolean isMaster() { return view != null - ? view.getLocalMember().isMaster() + ? view.getLocalMember().isLeader() : false; } @@ -132,7 +132,7 @@ public class MasterConsumer extends DefaultConsumer { ServiceHelper.startService(delegatedEndpoint); ServiceHelper.startService(delegatedConsumer); - LOGER.info("Leadership taken: consumer started: {}", delegatedEndpoint); + LOGGER.info("Leadership taken: consumer started: {}", delegatedEndpoint); } private synchronized void onLeadershipLost() throws Exception { @@ -141,7 +141,7 @@ public class MasterConsumer extends DefaultConsumer { delegatedConsumer = null; - LOGER.info("Leadership lost: consumer stopped: {}", delegatedEndpoint); + LOGGER.info("Leadership lost: consumer stopped: {}", delegatedEndpoint); } // ************************************** @@ -150,13 +150,13 @@ public class MasterConsumer extends DefaultConsumer { private final class LeadershipListener implements CamelClusterEventListener.Leadership { @Override - public void leadershipChanged(CamelClusterView view, CamelClusterMember leader) { + public void leadershipChanged(CamelClusterView view, Optional<CamelClusterMember> leader) { if (!isRunAllowed()) { return; } try { - if (view.getLocalMember().isMaster()) { + if (view.getLocalMember().isLeader()) { onLeadershipTaken(); } else if (delegatedConsumer != null) { onLeadershipLost(); http://git-wip-us.apache.org/repos/asf/camel/blob/1c8bca8e/components/camel-master/src/main/java/org/apache/camel/component/master/MasterEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-master/src/main/java/org/apache/camel/component/master/MasterEndpoint.java b/components/camel-master/src/main/java/org/apache/camel/component/master/MasterEndpoint.java index 2b63597..10547a5 100644 --- a/components/camel-master/src/main/java/org/apache/camel/component/master/MasterEndpoint.java +++ b/components/camel-master/src/main/java/org/apache/camel/component/master/MasterEndpoint.java @@ -23,14 +23,22 @@ import org.apache.camel.Processor; import org.apache.camel.Producer; import org.apache.camel.api.management.ManagedAttribute; import org.apache.camel.api.management.ManagedResource; +import org.apache.camel.ha.CamelClusterService; import org.apache.camel.impl.DefaultEndpoint; import org.apache.camel.spi.Metadata; import org.apache.camel.spi.UriEndpoint; import org.apache.camel.spi.UriPath; @ManagedResource(description = "Managed Master Endpoint") -@UriEndpoint(firstVersion = "2.20.0", scheme = "master", syntax = "master:namespace:delegateUri", consumerClass = MasterConsumer.class, consumerOnly = true, - title = "Master", lenientProperties = true, label = "clustering") +@UriEndpoint( + firstVersion = "2.20.0", + scheme = "master", + syntax = "master:namespace:delegateUri", + consumerClass = MasterConsumer.class, + consumerOnly = true, + title = "Master", + lenientProperties = true, + label = "clustering") public class MasterEndpoint extends DefaultEndpoint implements DelegateEndpoint { private final Endpoint delegateEndpoint; @@ -43,8 +51,12 @@ public class MasterEndpoint extends DefaultEndpoint implements DelegateEndpoint @Metadata(required = "true") private final String delegateUri; - public MasterEndpoint(String uri, MasterComponent component, String namespace, String delegateUri) { + private final CamelClusterService clusterService; + + public MasterEndpoint(String uri, MasterComponent component, CamelClusterService clusterService, String namespace, String delegateUri) { super(uri, component); + + this.clusterService = clusterService; this.namespace = namespace; this.delegateUri = delegateUri; this.delegateEndpoint = getCamelContext().getEndpoint(delegateUri); @@ -52,12 +64,13 @@ public class MasterEndpoint extends DefaultEndpoint implements DelegateEndpoint @Override public Producer createProducer() throws Exception { + getComponent(); throw new UnsupportedOperationException("Cannot produce from this endpoint"); } @Override public Consumer createConsumer(Processor processor) throws Exception { - return new MasterConsumer(this, processor); + return new MasterConsumer(this, processor, clusterService); } @Override http://git-wip-us.apache.org/repos/asf/camel/blob/1c8bca8e/components/camel-master/src/test/java/org/apache/camel/component/master/MasterComponentTest.java ---------------------------------------------------------------------- diff --git a/components/camel-master/src/test/java/org/apache/camel/component/master/MasterComponentTest.java b/components/camel-master/src/test/java/org/apache/camel/component/master/MasterComponentTest.java index bbd8ce3..c252ccc 100644 --- a/components/camel-master/src/test/java/org/apache/camel/component/master/MasterComponentTest.java +++ b/components/camel-master/src/test/java/org/apache/camel/component/master/MasterComponentTest.java @@ -27,9 +27,8 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.component.master.util.InMemoryClusterService; +import org.apache.camel.component.file.ha.FileLockClusterService; import org.apache.camel.impl.DefaultCamelContext; -import org.apache.camel.test.AvailablePortFinder; import org.junit.Assert; import org.junit.Test; import org.slf4j.Logger; @@ -39,17 +38,14 @@ import org.slf4j.LoggerFactory; public class MasterComponentTest { private static final Logger LOGGER = LoggerFactory.getLogger(MasterComponentTest.class); private static final List<String> INSTANCES = IntStream.range(0, 3).mapToObj(Integer::toString).collect(Collectors.toList()); - private static final List<Integer> PORTS = INSTANCES.stream().map(i -> AvailablePortFinder.getNextAvailable()).collect(Collectors.toList()); private static final List<String> RESULTS = new ArrayList<>(); - private static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(INSTANCES.size() * 2); + private static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(INSTANCES.size()); private static final CountDownLatch LATCH = new CountDownLatch(INSTANCES.size()); @Test public void test() throws Exception { - for (int i = 0; i < INSTANCES.size(); i++) { - int index = i; - SCHEDULER.submit(() -> run(INSTANCES.get(index), index, PORTS)); - Thread.sleep(1000); + for (String instance: INSTANCES) { + SCHEDULER.submit(() -> run(instance)); } LATCH.await(1, TimeUnit.MINUTES); @@ -63,13 +59,16 @@ public class MasterComponentTest { // Run a Camel node // ************************************ - private static void run(String id, int index, List<Integer> ports) { + private static void run(String id) { try { - CountDownLatch contextLatch = new CountDownLatch(1); + int events = ThreadLocalRandom.current().nextInt(2, 6); + CountDownLatch contextLatch = new CountDownLatch(events); - InMemoryClusterService service = new InMemoryClusterService(); - service.setIndex(index); - service.setPorts(ports); + FileLockClusterService service = new FileLockClusterService(); + service.setId(id); + service.setRoot("target/ha"); + service.setAcquireLockDelay(1, TimeUnit.SECONDS); + service.setAcquireLockInterval(1, TimeUnit.SECONDS); DefaultCamelContext context = new DefaultCamelContext(); context.disableJMX(); @@ -78,15 +77,10 @@ public class MasterComponentTest { context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { - from("master:ns:timer:test?delay=1s&period=1s&repeatCount=1") + from("master:ns:timer:test?delay=1s&period=1s") .routeId("route-" + id) - .process(e -> { - LOGGER.info("Node {} done", id); - RESULTS.add(id); - // Shutdown the context later on to give a chance to - // other members to catch-up - SCHEDULER.schedule(contextLatch::countDown, 2 + ThreadLocalRandom.current().nextInt(3), TimeUnit.SECONDS); - }); + .log("From ${routeId}") + .process(e -> contextLatch.countDown()); } }); @@ -96,6 +90,10 @@ public class MasterComponentTest { context.start(); contextLatch.await(); + + LOGGER.debug("Shutting down node {}", id); + RESULTS.add(id); + context.stop(); LATCH.countDown(); http://git-wip-us.apache.org/repos/asf/camel/blob/1c8bca8e/components/camel-master/src/test/java/org/apache/camel/component/master/util/InMemoryClusterMember.java ---------------------------------------------------------------------- diff --git a/components/camel-master/src/test/java/org/apache/camel/component/master/util/InMemoryClusterMember.java b/components/camel-master/src/test/java/org/apache/camel/component/master/util/InMemoryClusterMember.java deleted file mode 100644 index cfe3fcc..0000000 --- a/components/camel-master/src/test/java/org/apache/camel/component/master/util/InMemoryClusterMember.java +++ /dev/null @@ -1,49 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.master.util; - -import java.util.List; - -import org.apache.camel.ha.CamelClusterMember; -import org.apache.camel.util.ObjectHelper; -import org.jgroups.Address; -import org.jgroups.JChannel; -import org.jgroups.View; - -public class InMemoryClusterMember implements CamelClusterMember { - private final JChannel channel; - private final View view; - - public InMemoryClusterMember(JChannel channel) { - this.channel = channel; - this.view = channel.getView(); - } - - @Override - public boolean isMaster() { - final List<Address> members = view.getMembers(); - - return ObjectHelper.isNotEmpty(members) - ? members.get(0).equals(channel.getAddress()) - : false; - } - - @Override - public String getId() { - return channel.name(); - } -} http://git-wip-us.apache.org/repos/asf/camel/blob/1c8bca8e/components/camel-master/src/test/java/org/apache/camel/component/master/util/InMemoryClusterService.java ---------------------------------------------------------------------- diff --git a/components/camel-master/src/test/java/org/apache/camel/component/master/util/InMemoryClusterService.java b/components/camel-master/src/test/java/org/apache/camel/component/master/util/InMemoryClusterService.java deleted file mode 100644 index 7b40a72..0000000 --- a/components/camel-master/src/test/java/org/apache/camel/component/master/util/InMemoryClusterService.java +++ /dev/null @@ -1,47 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.master.util; - -import java.util.List; - -import org.apache.camel.impl.ha.AbstractCamelClusterService; - -public class InMemoryClusterService extends AbstractCamelClusterService<InMemoryClusterView> { - private int index; - private List<Integer> ports; - - @Override - protected InMemoryClusterView createView(String namespace) throws Exception { - return new InMemoryClusterView(this, namespace); - } - - public int getIndex() { - return index; - } - - public void setIndex(int index) { - this.index = index; - } - - public List<Integer> getPorts() { - return ports; - } - - public void setPorts(List<Integer> ports) { - this.ports = ports; - } -} http://git-wip-us.apache.org/repos/asf/camel/blob/1c8bca8e/components/camel-master/src/test/java/org/apache/camel/component/master/util/InMemoryClusterView.java ---------------------------------------------------------------------- diff --git a/components/camel-master/src/test/java/org/apache/camel/component/master/util/InMemoryClusterView.java b/components/camel-master/src/test/java/org/apache/camel/component/master/util/InMemoryClusterView.java deleted file mode 100644 index 41728e1..0000000 --- a/components/camel-master/src/test/java/org/apache/camel/component/master/util/InMemoryClusterView.java +++ /dev/null @@ -1,159 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.master.util; - -import java.net.InetAddress; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Optional; - -import static java.util.stream.Collectors.toList; - -import org.apache.camel.ha.CamelClusterMember; -import org.apache.camel.ha.CamelClusterService; -import org.apache.camel.impl.ha.AbstractCamelClusterView; -import org.apache.camel.util.ObjectHelper; -import org.jgroups.Address; -import org.jgroups.JChannel; -import org.jgroups.PhysicalAddress; -import org.jgroups.ReceiverAdapter; -import org.jgroups.View; -import org.jgroups.protocols.TCP; -import org.jgroups.protocols.TCPPING; -import org.jgroups.stack.IpAddress; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class InMemoryClusterView extends AbstractCamelClusterView { - private static final Logger LOGGER = LoggerFactory.getLogger(InMemoryClusterView.class); - - private JChannel channel; - - protected InMemoryClusterView(CamelClusterService cluster, String namespace) { - super(cluster, namespace); - } - - @Override - public Optional<CamelClusterMember> getMaster() { - return channel != null - ? Optional.of(new InMemoryClusterMember(channel)) - : Optional.empty(); - } - - @Override - public CamelClusterMember getLocalMember() { - return new LocalClusterMember(); - } - - @Override - public List<CamelClusterMember> getMembers() { - if (channel != null) { - channel.getView().getMembers().stream() - .map(ClusterMember::new) - .collect(toList()); - } - - return Collections.emptyList(); - } - - @Override - protected void doStart() throws Exception { - final int index = getClusterService().unwrap(InMemoryClusterService.class).getIndex(); - final List<Integer> ports = getClusterService().unwrap(InMemoryClusterService.class).getPorts(); - final List<PhysicalAddress> addresses = new ArrayList<>(); - - for (Integer port: ports) { - addresses.add(new IpAddress("127.0.0.1", port)); - } - - this.channel = new JChannel(getClass().getResourceAsStream("/jgroups-tcp.xml")); - - TCP tcp = this.channel.getProtocolStack().findProtocol(TCP.class); - tcp.setBindAddress(InetAddress.getByName("127.0.0.1")); - tcp.setBindPort(ports.get(index)); - - TCPPING tcpping = this.channel.getProtocolStack().findProtocol(TCPPING.class); - tcpping.setInitialHosts(addresses); - - this.channel.setReceiver(new ReceiverAdapter() { - @Override - public void viewAccepted(View view) { - fireLeadershipChangedEvent(new ClusterMember(view.getMembers().get(0))); - } - }); - - this.channel.connect(getNamespace()); - } - - @Override - protected void doStop() throws Exception { - if (channel != null) { - channel.close(); - } - } - - // *********************************** - // - // *********************************** - - private class LocalClusterMember implements CamelClusterMember { - @Override - public boolean isMaster() { - if (channel == null) { - return false; - } - - List<Address> members = channel.view().getMembers(); - - if (ObjectHelper.isNotEmpty(members)) { - LOGGER.info("master={}, channel={}, members={}", members.get(0), channel.getAddress(), members); - return members.get(0).equals(channel.getAddress()); - } - - return false; - } - - @Override - public String getId() { - return channel != null ? channel.getAddressAsString() : "local"; - } - } - - private class ClusterMember implements CamelClusterMember { - private final Address address; - - public ClusterMember(Address address) { - this.address = address; - } - - @Override - public boolean isMaster() { - final List<Address> members = channel.view().getMembers(); - - return ObjectHelper.isNotEmpty(members) - ? members.get(0).equals(address) - : false; - } - - @Override - public String getId() { - return channel.getAddressAsString(); - } - } - -} http://git-wip-us.apache.org/repos/asf/camel/blob/1c8bca8e/components/camel-master/src/test/resources/jgroups-tcp.xml ---------------------------------------------------------------------- diff --git a/components/camel-master/src/test/resources/jgroups-tcp.xml b/components/camel-master/src/test/resources/jgroups-tcp.xml deleted file mode 100644 index 3e6d58d..0000000 --- a/components/camel-master/src/test/resources/jgroups-tcp.xml +++ /dev/null @@ -1,53 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - ---> -<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xmlns="urn:org:jgroups" - xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/jgroups.xsd"> - <TCP bind_port="7800" - recv_buf_size="${tcp.recv_buf_size:130k}" - send_buf_size="${tcp.send_buf_size:130k}" - max_bundle_size="64K" - sock_conn_timeout="300" - - thread_pool.min_threads="0" - thread_pool.max_threads="20" - thread_pool.keep_alive_time="30000"/> - - <TCPPING async_discovery="true" - initial_hosts="${jgroups.tcpping.initial_hosts:localhost[7800],localhost[7801]}" - port_range="2"/> - <MERGE3 min_interval="10000" - max_interval="30000"/> - <FD_SOCK/> - <FD timeout="3000" max_tries="3" /> - <VERIFY_SUSPECT timeout="1500" /> - <BARRIER /> - <pbcast.NAKACK2 use_mcast_xmit="false" - discard_delivered_msgs="true"/> - <UNICAST3 /> - <pbcast.STABLE desired_avg_gossip="50000" - max_bytes="4M"/> - <pbcast.GMS print_local_addr="true" join_timeout="2000"/> - <MFC max_credits="2M" - min_threshold="0.4"/> - <FRAG2 frag_size="60K" /> - <!--RSVP resend_interval="2000" timeout="10000"/--> - <pbcast.STATE_TRANSFER/> -</config> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/1c8bca8e/components/camel-master/src/test/resources/log4j2.properties ---------------------------------------------------------------------- diff --git a/components/camel-master/src/test/resources/log4j2.properties b/components/camel-master/src/test/resources/log4j2.properties index e5a24c1f..40a55e4 100644 --- a/components/camel-master/src/test/resources/log4j2.properties +++ b/components/camel-master/src/test/resources/log4j2.properties @@ -25,6 +25,9 @@ appender.out.name = out appender.out.layout.type = PatternLayout appender.out.layout.pattern = [%t] %c{1} %-5p %m%n + +logger.camel-file-ha.name = org.apache.camel.component.file.ha +logger.camel-file-ha.level = DEBUG logger.camel-master.name = org.apache.camel.component.master logger.camel-master.level = DEBUG logger.camel.name = org.apache.camel @@ -34,7 +37,7 @@ logger.springframework.name = org.springframework logger.springframework.level = WARN rootLogger.level = INFO -rootLogger.appenderRef.stdout.ref = out +#rootLogger.appenderRef.stdout.ref = out rootLogger.appenderRef.file.ref = file http://git-wip-us.apache.org/repos/asf/camel/blob/1c8bca8e/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/SpringZooKeeperClusteredRouteConfigurationTest.java ---------------------------------------------------------------------- diff --git a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/SpringZooKeeperClusteredRouteConfigurationTest.java b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/SpringZooKeeperClusteredRouteConfigurationTest.java new file mode 100644 index 0000000..f02df61 --- /dev/null +++ b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/SpringZooKeeperClusteredRouteConfigurationTest.java @@ -0,0 +1,26 @@ +package org.apache.camel.component.zookeeper.ha; + +import org.apache.camel.ha.CamelClusterService; +import org.apache.camel.impl.ha.ClusteredRoutePolicyFactory; +import org.apache.camel.test.spring.CamelSpringTestSupport; +import org.junit.Test; +import org.springframework.context.support.AbstractApplicationContext; +import org.springframework.context.support.ClassPathXmlApplicationContext; + +public class SpringZooKeeperClusteredRouteConfigurationTest extends CamelSpringTestSupport { + + @Test + public void test() { + assertNotNull(context.hasService(CamelClusterService.class)); + assertTrue(context.getRoutePolicyFactories().stream().anyMatch(ClusteredRoutePolicyFactory.class::isInstance)); + } + + // *********************** + // Routes + // *********************** + + @Override + protected AbstractApplicationContext createApplicationContext() { + return new ClassPathXmlApplicationContext("org/apache/camel/component/zookeeper/ha/SpringZooKeeperClusteredRouteConfigurationTest.xml"); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/1c8bca8e/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperMasterMain.java ---------------------------------------------------------------------- diff --git a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperMasterMain.java b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperMasterMain.java index 148a5dd..1c70ff0 100644 --- a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperMasterMain.java +++ b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperMasterMain.java @@ -21,6 +21,7 @@ import java.util.concurrent.ThreadLocalRandom; import org.apache.camel.CamelContext; import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.impl.ExplicitCamelContextNameStrategy; import org.apache.camel.main.Main; import org.apache.camel.main.MainListenerSupport; @@ -29,16 +30,17 @@ public final class ZooKeeperMasterMain { final String nodeId = UUID.randomUUID().toString(); final String address = args[0]; - ZooKeeperClusterService service = new ZooKeeperClusterService(); - service.setId("node-" + nodeId); - service.setNodes(address); - service.setBasePath("/camel/master"); - Main main = new Main(); main.addMainListener(new MainListenerSupport() { @Override public void configure(CamelContext context) { try { + ZooKeeperClusterService service = new ZooKeeperClusterService(); + service.setId("node-" + nodeId); + service.setNodes(address); + service.setBasePath("/camel/master"); + + context.setNameStrategy(new ExplicitCamelContextNameStrategy("camel-" + nodeId)); context.addService(service); } catch (Exception e) { throw new RuntimeException(e); http://git-wip-us.apache.org/repos/asf/camel/blob/1c8bca8e/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperMasterTest.java ---------------------------------------------------------------------- diff --git a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperMasterTest.java b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperMasterTest.java index 948834a..3e50bd2 100644 --- a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperMasterTest.java +++ b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperMasterTest.java @@ -41,7 +41,7 @@ public final class ZooKeeperMasterTest { private static final Logger LOGGER = LoggerFactory.getLogger(ZooKeeperMasterTest.class); private static final List<String> CLIENTS = IntStream.range(0, 3).mapToObj(Integer::toString).collect(Collectors.toList()); private static final List<String> RESULTS = new ArrayList<>(); - private static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(CLIENTS.size() * 2); + private static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(CLIENTS.size()); private static final CountDownLatch LATCH = new CountDownLatch(CLIENTS.size()); // ************************************ @@ -78,7 +78,8 @@ public final class ZooKeeperMasterTest { private static void run(String id) { try { - CountDownLatch contextLatch = new CountDownLatch(1); + int events = ThreadLocalRandom.current().nextInt(2, 6); + CountDownLatch contextLatch = new CountDownLatch(events); ZooKeeperClusterService service = new ZooKeeperClusterService(); service.setId("node-" + id); @@ -92,15 +93,10 @@ public final class ZooKeeperMasterTest { context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { - from("master:zk:timer:master?delay=1s&period=1s&repeatCount=1") + from("master:zk:timer:master?delay=1s&period=1s") .routeId("route-" + id) - .process(e -> { - LOGGER.debug("Node {} done", id); - RESULTS.add(id); - // Shutdown the context later on to give a chance to - // other members to catch-up - SCHEDULER.schedule(contextLatch::countDown, 2 + ThreadLocalRandom.current().nextInt(3), TimeUnit.SECONDS); - }); + .log("From ${routeId}") + .process(e -> contextLatch.countDown()); } }); @@ -110,6 +106,10 @@ public final class ZooKeeperMasterTest { context.start(); contextLatch.await(); + + LOGGER.debug("Shutting down node {}", id); + RESULTS.add(id); + context.stop(); LATCH.countDown(); http://git-wip-us.apache.org/repos/asf/camel/blob/1c8bca8e/components/camel-zookeeper/src/test/resources/org/apache/camel/component/zookeeper/ha/SpringZooKeeperClusteredRouteConfigurationTest.xml ---------------------------------------------------------------------- diff --git a/components/camel-zookeeper/src/test/resources/org/apache/camel/component/zookeeper/ha/SpringZooKeeperClusteredRouteConfigurationTest.xml b/components/camel-zookeeper/src/test/resources/org/apache/camel/component/zookeeper/ha/SpringZooKeeperClusteredRouteConfigurationTest.xml new file mode 100644 index 0000000..2ab9589 --- /dev/null +++ b/components/camel-zookeeper/src/test/resources/org/apache/camel/component/zookeeper/ha/SpringZooKeeperClusteredRouteConfigurationTest.xml @@ -0,0 +1,42 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +--> +<beans xmlns="http://www.springframework.org/schema/beans" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation=" + http://www.springframework.org/schema/beans + http://www.springframework.org/schema/beans/spring-beans.xsd + http://camel.apache.org/schema/spring + http://camel.apache.org/schema/spring/camel-spring.xsd"> + + + <bean id="cluster-service" class="org.apache.camel.component.zookeeper.ha.ZooKeeperClusterService"> + <property name="id" value="node-1"/> + <property name="basePath" value="/camel/ha"/> + <property name="nodes" value="localhost:2181"/> + </bean> + + <bean id="cluster-policy" class="org.apache.camel.impl.ha.ClusteredRoutePolicyFactory" factory-method="forNamespace"> + <constructor-arg value="my-ns"/> + </bean> + + <camelContext xmlns="http://camel.apache.org/schema/spring" autoStartup="false"> + </camelContext> + +</beans> http://git-wip-us.apache.org/repos/asf/camel/blob/1c8bca8e/platforms/spring-boot/components-starter/camel-master-starter/src/main/java/org/apache/camel/component/master/springboot/MasterComponentConfiguration.java ---------------------------------------------------------------------- diff --git a/platforms/spring-boot/components-starter/camel-master-starter/src/main/java/org/apache/camel/component/master/springboot/MasterComponentConfiguration.java b/platforms/spring-boot/components-starter/camel-master-starter/src/main/java/org/apache/camel/component/master/springboot/MasterComponentConfiguration.java index 9c104c1..3a1de95 100644 --- a/platforms/spring-boot/components-starter/camel-master-starter/src/main/java/org/apache/camel/component/master/springboot/MasterComponentConfiguration.java +++ b/platforms/spring-boot/components-starter/camel-master-starter/src/main/java/org/apache/camel/component/master/springboot/MasterComponentConfiguration.java @@ -17,12 +17,14 @@ package org.apache.camel.component.master.springboot; import javax.annotation.Generated; +import org.apache.camel.ha.CamelClusterService; +import org.apache.camel.ha.CamelClusterService.Selector; import org.apache.camel.spring.boot.ComponentConfigurationPropertiesCommon; import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.boot.context.properties.NestedConfigurationProperty; /** - * Represents an endpoint which only becomes active when it obtains the master - * lock + * Camel Master Support * * Generated by camel-package-maven-plugin - do not edit this file! */ @@ -33,12 +35,39 @@ public class MasterComponentConfiguration ComponentConfigurationPropertiesCommon { /** + * Inject the service to use. + */ + @NestedConfigurationProperty + private CamelClusterService service; + /** + * Inject the service selector used to lookup the CamelClusterService to + * use. + */ + @NestedConfigurationProperty + private Selector serviceSelector; + /** * Whether the component should resolve property placeholders on itself when * starting. Only properties which are of String type can use property * placeholders. */ private Boolean resolvePropertyPlaceholders = true; + public CamelClusterService getService() { + return service; + } + + public void setService(CamelClusterService service) { + this.service = service; + } + + public Selector getServiceSelector() { + return serviceSelector; + } + + public void setServiceSelector(Selector serviceSelector) { + this.serviceSelector = serviceSelector; + } + public Boolean getResolvePropertyPlaceholders() { return resolvePropertyPlaceholders; } http://git-wip-us.apache.org/repos/asf/camel/blob/1c8bca8e/platforms/spring-boot/spring-boot-dm/camel-spring-boot-dependencies/pom.xml ---------------------------------------------------------------------- diff --git a/platforms/spring-boot/spring-boot-dm/camel-spring-boot-dependencies/pom.xml b/platforms/spring-boot/spring-boot-dm/camel-spring-boot-dependencies/pom.xml index 46c4fab..7171372 100644 --- a/platforms/spring-boot/spring-boot-dm/camel-spring-boot-dependencies/pom.xml +++ b/platforms/spring-boot/spring-boot-dm/camel-spring-boot-dependencies/pom.xml @@ -1756,6 +1756,16 @@ </dependency> <dependency> <groupId>org.apache.camel</groupId> + <artifactId>camel-master</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-master-starter</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.camel</groupId> <artifactId>camel-metrics</artifactId> <version>${project.version}</version> </dependency>