Repository: camel Updated Branches: refs/heads/master 5de4ed40f -> ccd30cd25
CAMEL-11847: cluster-service : support multiple cluster services Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ab42e163 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ab42e163 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ab42e163 Branch: refs/heads/master Commit: ab42e1630d8508bed8817094fe4f71093c082cec Parents: 5de4ed4 Author: lburgazzoli <lburgazz...@gmail.com> Authored: Wed Sep 27 08:44:37 2017 +0200 Committer: lburgazzoli <lburgazz...@gmail.com> Committed: Wed Sep 27 16:16:58 2017 +0200 ---------------------------------------------------------------------- .../apache/camel/ha/CamelClusterService.java | 25 +- .../camel/ha/CamelClusterServiceHelper.java | 41 -- .../impl/ha/AbstractCamelClusterService.java | 29 ++ .../camel/impl/ha/ClusterServiceHelper.java | 54 +++ .../camel/impl/ha/ClusterServiceSelectors.java | 153 +++++++ .../camel/impl/ha/ClusteredRouteController.java | 29 +- .../camel/impl/ha/ClusteredRoutePolicy.java | 31 +- .../impl/ha/ClusteredRoutePolicyFactory.java | 31 +- .../management/mbean/ManagedClusterService.java | 10 +- .../camel/impl/ha/CamelClusterServiceTest.java | 155 ------- .../impl/ha/ClusterServiceSelectorTest.java | 404 +++++++++++++++++++ .../camel/impl/ha/ClusterServiceViewTest.java | 155 +++++++ .../atomix/ha/AtomixClusterClientService.java | 3 - .../atomix/ha/AtomixClusterService.java | 3 - .../xml/AbstractCamelContextFactoryBean.java | 11 +- .../spring/boot/CamelAutoConfiguration.java | 11 +- .../AtomixClusterServiceAutoConfiguration.java | 11 +- .../AtomixClusterServiceConfiguration.java | 27 ++ .../ConsulClusterServiceAutoConfiguration.java | 4 +- .../ha/ConsulClusterServiceConfiguration.java | 28 ++ ...FileLockClusterServiceAutoConfiguration.java | 8 +- .../ha/FileLockClusterServiceConfiguration.java | 28 ++ ...bernetesClusterServiceAutoConfiguration.java | 4 +- .../KubernetesClusterServiceConfiguration.java | 25 ++ ...ooKeeperClusterServiceAutoConfiguration.java | 4 +- .../ZooKeeperClusterServiceConfiguration.java | 28 ++ 26 files changed, 1057 insertions(+), 255 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/ab42e163/camel-core/src/main/java/org/apache/camel/ha/CamelClusterService.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/ha/CamelClusterService.java b/camel-core/src/main/java/org/apache/camel/ha/CamelClusterService.java index c8aff74..aa56cce 100644 --- a/camel-core/src/main/java/org/apache/camel/ha/CamelClusterService.java +++ b/camel-core/src/main/java/org/apache/camel/ha/CamelClusterService.java @@ -17,12 +17,21 @@ package org.apache.camel.ha; import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.Optional; import org.apache.camel.CamelContextAware; +import org.apache.camel.Ordered; import org.apache.camel.Service; import org.apache.camel.spi.IdAware; -public interface CamelClusterService extends Service, CamelContextAware, IdAware { +public interface CamelClusterService extends Service, CamelContextAware, IdAware, Ordered { + + @Override + default int getOrder() { + return Ordered.LOWEST; + } /** * Get a view of the cluster bound to a namespace creating it if needed. Multiple @@ -68,6 +77,13 @@ public interface CamelClusterService extends Service, CamelContextAware, IdAware boolean isLeader(String namespace); /** + * Attributes associated to the service. + */ + default Map<String, Object> getAttributes() { + return Collections.emptyMap(); + } + + /** * Access the underlying concrete CamelClusterService implementation to * provide access to further features. * @@ -83,4 +99,11 @@ public interface CamelClusterService extends Service, CamelContextAware, IdAware "Unable to unwrap this CamelClusterService type (" + getClass() + ") to the required type (" + clazz + ")" ); } + + interface Selector { + /** + * Select a specific CamelClusterService instance among a collection. + */ + Optional<CamelClusterService> select(Collection<CamelClusterService> services); + } } http://git-wip-us.apache.org/repos/asf/camel/blob/ab42e163/camel-core/src/main/java/org/apache/camel/ha/CamelClusterServiceHelper.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/ha/CamelClusterServiceHelper.java b/camel-core/src/main/java/org/apache/camel/ha/CamelClusterServiceHelper.java deleted file mode 100644 index 96cc292..0000000 --- a/camel-core/src/main/java/org/apache/camel/ha/CamelClusterServiceHelper.java +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.ha; - -import java.util.Optional; -import java.util.function.Predicate; - -import org.apache.camel.CamelContext; - -public final class CamelClusterServiceHelper { - private CamelClusterServiceHelper() { - } - - public static Optional<CamelClusterService> lookupService(CamelContext context) { - return Optional.ofNullable(context.hasService(CamelClusterService.class)); - } - - public static Optional<CamelClusterService> lookupService(CamelContext context, Predicate<CamelClusterService> selector) { - for (CamelClusterService service: context.hasServices(CamelClusterService.class)) { - if (selector.test(service)) { - return Optional.of(service); - } - } - - return Optional.empty(); - } -} http://git-wip-us.apache.org/repos/asf/camel/blob/ab42e163/camel-core/src/main/java/org/apache/camel/impl/ha/AbstractCamelClusterService.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/ha/AbstractCamelClusterService.java b/camel-core/src/main/java/org/apache/camel/impl/ha/AbstractCamelClusterService.java index 1e33442..a9b2e6a 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/ha/AbstractCamelClusterService.java +++ b/camel-core/src/main/java/org/apache/camel/impl/ha/AbstractCamelClusterService.java @@ -17,12 +17,14 @@ package org.apache.camel.impl.ha; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.concurrent.locks.StampedLock; import org.apache.camel.CamelContext; +import org.apache.camel.Ordered; import org.apache.camel.ha.CamelClusterMember; import org.apache.camel.ha.CamelClusterService; import org.apache.camel.ha.CamelClusterView; @@ -36,7 +38,9 @@ public abstract class AbstractCamelClusterService<T extends CamelClusterView> ex private static final Logger LOGGER = LoggerFactory.getLogger(AbstractCamelClusterService.class); private final Map<String, ViewHolder<T>> views; + private final Map<String, Object> attributes; private final StampedLock lock; + private int order; private String id; private CamelContext camelContext; @@ -49,10 +53,21 @@ public abstract class AbstractCamelClusterService<T extends CamelClusterView> ex } protected AbstractCamelClusterService(String id, CamelContext camelContext) { + this.order = Ordered.LOWEST; this.id = id; this.camelContext = camelContext; this.views = new HashMap<>(); this.lock = new StampedLock(); + this.attributes = new HashMap<>(); + } + + @Override + public int getOrder() { + return order; + } + + public void setOrder(int order) { + this.order = order; } @Override @@ -84,6 +99,20 @@ public abstract class AbstractCamelClusterService<T extends CamelClusterView> ex return camelContext; } + public void setAttributes(Map<String, Object> attributes) { + this.attributes.clear(); + this.attributes.putAll(attributes); + } + + public void setAttribute(String key, Object value) { + this.attributes.put(key, value); + } + + @Override + public Map<String, Object> getAttributes() { + return Collections.unmodifiableMap(attributes); + } + @Override protected void doStart() throws Exception { LockHelper.doWithReadLockT( http://git-wip-us.apache.org/repos/asf/camel/blob/ab42e163/camel-core/src/main/java/org/apache/camel/impl/ha/ClusterServiceHelper.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/ha/ClusterServiceHelper.java b/camel-core/src/main/java/org/apache/camel/impl/ha/ClusterServiceHelper.java new file mode 100644 index 0000000..b008af4 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/impl/ha/ClusterServiceHelper.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl.ha; + +import java.util.Optional; +import java.util.Set; + +import org.apache.camel.CamelContext; +import org.apache.camel.ha.CamelClusterService; +import org.apache.camel.util.ObjectHelper; + +public final class ClusterServiceHelper { + private ClusterServiceHelper() { + } + + public static Optional<CamelClusterService> lookupService(CamelContext context) { + return lookupService(context, ClusterServiceSelectors.DEFAULT_SELECTOR); + } + + public static Optional<CamelClusterService> lookupService(CamelContext context, CamelClusterService.Selector selector) { + ObjectHelper.notNull(context, "Camel Context"); + ObjectHelper.notNull(selector, "ClusterService selector"); + + Set<CamelClusterService> services = context.hasServices(CamelClusterService.class); + + if (ObjectHelper.isNotEmpty(services)) { + return selector.select(services); + } + + return Optional.empty(); + } + + public static CamelClusterService mandatoryLookupService(CamelContext context) { + return lookupService(context).orElseThrow(() -> new IllegalStateException("CamelCluster service not found")); + } + + public static CamelClusterService mandatoryLookupService(CamelContext context, CamelClusterService.Selector selector) { + return lookupService(context, selector).orElseThrow(() -> new IllegalStateException("CamelCluster service not found")); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/ab42e163/camel-core/src/main/java/org/apache/camel/impl/ha/ClusterServiceSelectors.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/ha/ClusterServiceSelectors.java b/camel-core/src/main/java/org/apache/camel/impl/ha/ClusterServiceSelectors.java new file mode 100644 index 0000000..2939afc --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/impl/ha/ClusterServiceSelectors.java @@ -0,0 +1,153 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl.ha; + +import java.util.Collection; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +import org.apache.camel.ha.CamelClusterService; +import org.apache.camel.util.ObjectHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class ClusterServiceSelectors { + public static final CamelClusterService.Selector DEFAULT_SELECTOR = new SelectSingle(); + private static final Logger LOGGER = LoggerFactory.getLogger(ClusterServiceSelectors.class); + + private ClusterServiceSelectors() { + } + + public static final class SelectSingle implements CamelClusterService.Selector { + @Override + public Optional<CamelClusterService> select(Collection<CamelClusterService> services) { + if (services != null && services.size() == 1) { + return Optional.of(services.iterator().next()); + } else { + LOGGER.warn("Multiple CamelClusterService instances available (items={})", services); + } + + return Optional.empty(); + } + } + + public static final class SelectFirst implements CamelClusterService.Selector { + @Override + public Optional<CamelClusterService> select(Collection<CamelClusterService> services) { + return ObjectHelper.isNotEmpty(services) + ? Optional.of(services.iterator().next()) + : Optional.empty(); + } + } + + public static final class SelectByOrder implements CamelClusterService.Selector { + @Override + public Optional<CamelClusterService> select(Collection<CamelClusterService> services) { + Optional<Map.Entry<Integer, List<CamelClusterService>>> highPriorityServices = services.stream() + .collect(Collectors.groupingBy(CamelClusterService::getOrder)) + .entrySet().stream() + .min(Comparator.comparingInt(Map.Entry::getKey)); + + + if (highPriorityServices.isPresent()) { + if (highPriorityServices.get().getValue().size() == 1) { + return Optional.of(highPriorityServices.get().getValue().iterator().next()); + } else { + LOGGER.warn("Multiple CamelClusterService instances available for highest priority (order={}, items={})", + highPriorityServices.get().getKey(), + highPriorityServices.get().getValue() + ); + } + } + + return Optional.empty(); + } + } + + public static final class SelectByType implements CamelClusterService.Selector { + private final Class<? extends CamelClusterService> type; + + public SelectByType(Class<? extends CamelClusterService> type) { + this.type = type; + } + + @Override + public Optional<CamelClusterService> select(Collection<CamelClusterService> services) { + for (CamelClusterService service : services) { + if (type.isInstance(service)) { + return Optional.of(service); + } + } + + return Optional.empty(); + } + } + + public static final class SelectByAttribute implements CamelClusterService.Selector { + private final String key; + private final Object value; + + public SelectByAttribute(String key, Object value) { + this.key = key; + this.value = value; + } + + @Override + public Optional<CamelClusterService> select(Collection<CamelClusterService> services) { + for (CamelClusterService service : services) { + Map<String, Object> attributes = service.getAttributes(); + + if (ObjectHelper.equal(attributes.get(key), value)) { + return Optional.of(service); + } + } + + return Optional.empty(); + } + } + + // ********************************** + // Helpers + // ********************************** + + public static CamelClusterService.Selector defaultSelector() { + return DEFAULT_SELECTOR; + } + + public static CamelClusterService.Selector single() { + return new SelectSingle(); + } + + public static CamelClusterService.Selector first() { + return new SelectFirst(); + } + + public static CamelClusterService.Selector order() { + return new SelectByOrder(); + } + + public static CamelClusterService.Selector type(Class<? extends CamelClusterService> type) { + return new SelectByType(type); + } + + public static CamelClusterService.Selector attribute(String key, Object value) { + return new SelectByAttribute(key, value); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/ab42e163/camel-core/src/main/java/org/apache/camel/impl/ha/ClusteredRouteController.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/ha/ClusteredRouteController.java b/camel-core/src/main/java/org/apache/camel/impl/ha/ClusteredRouteController.java index 2a1849e..a07ed86 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/ha/ClusteredRouteController.java +++ b/camel-core/src/main/java/org/apache/camel/impl/ha/ClusteredRouteController.java @@ -51,11 +51,13 @@ public class ClusteredRouteController extends DefaultRouteController { private final PolicyFactory policyFactory; private final ClusteredRouteConfiguration defaultConfiguration; private CamelClusterService clusterService; + private CamelClusterService.Selector clusterServiceSelector; public ClusteredRouteController() { this.routes = new CopyOnWriteArraySet<>(); this.configurations = new ConcurrentHashMap<>(); this.filters = new ArrayList<>(); + this.clusterServiceSelector = ClusterServiceSelectors.DEFAULT_SELECTOR; this.policyFactory = new PolicyFactory(); this.defaultConfiguration = new ClusteredRouteConfiguration(); @@ -133,15 +135,28 @@ public class ClusteredRouteController extends DefaultRouteController { return clusterService; } + /** + * Set the cluster service to use. + */ public void setClusterService(CamelClusterService clusterService) { - // prevent replacing the service - if (this.clusterService != null && this.clusterService != clusterService) { - throw new IllegalArgumentException("CamelClusterService is already set"); - } + ObjectHelper.notNull(clusterService, "CamelClusterService"); this.clusterService = clusterService; } + public CamelClusterService.Selector getClusterServiceSelector() { + return clusterServiceSelector; + } + + /** + * Set the selector strategy to look-up a {@link CamelClusterService} + */ + public void setClusterServiceSelector(CamelClusterService.Selector clusterServiceSelector) { + ObjectHelper.notNull(clusterService, "CamelClusterService.Selector"); + + this.clusterServiceSelector = clusterServiceSelector; + } + // ******************************* // // ******************************* @@ -165,10 +180,10 @@ public class ClusteredRouteController extends DefaultRouteController { if (clusterService == null) { // Finally try to grab it from the camel context. - clusterService = context.hasService(CamelClusterService.class); + clusterService = ClusterServiceHelper.mandatoryLookupService(context, clusterServiceSelector); } - ObjectHelper.notNull(clusterService, "clusterService"); + LOGGER.debug("Using ClusterService instance {} (id={}, type={})", clusterService, clusterService.getId(), clusterService.getClass().getName()); if (!ServiceHelper.isStarted(clusterService)) { // Start the cluster service if not yet started. @@ -294,7 +309,7 @@ public class ClusteredRouteController extends DefaultRouteController { final String namespace = ObjectHelper.supplyIfEmpty(configuration.getNamespace(), defaultConfiguration::getNamespace); final Duration initialDelay = ObjectHelper.supplyIfEmpty(configuration.getInitialDelay(), defaultConfiguration::getInitialDelay); - ClusteredRoutePolicy policy = new ClusteredRoutePolicy(clusterService.getView(namespace)); + ClusteredRoutePolicy policy = ClusteredRoutePolicy.forView(clusterService.getView(namespace)); policy.setCamelContext(getCamelContext()); policy.setInitialDelay(initialDelay); http://git-wip-us.apache.org/repos/asf/camel/blob/ab42e163/camel-core/src/main/java/org/apache/camel/impl/ha/ClusteredRoutePolicy.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/ha/ClusteredRoutePolicy.java b/camel-core/src/main/java/org/apache/camel/impl/ha/ClusteredRoutePolicy.java index 8e923d1..5b45cbc 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/ha/ClusteredRoutePolicy.java +++ b/camel-core/src/main/java/org/apache/camel/impl/ha/ClusteredRoutePolicy.java @@ -45,7 +45,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; @ManagedResource(description = "Clustered Route policy using") -public class ClusteredRoutePolicy extends RoutePolicySupport implements CamelContextAware { +public final class ClusteredRoutePolicy extends RoutePolicySupport implements CamelContextAware { private static final Logger LOGGER = LoggerFactory.getLogger(ClusteredRoutePolicy.class); private final AtomicBoolean leader; @@ -61,7 +61,7 @@ public class ClusteredRoutePolicy extends RoutePolicySupport implements CamelCon private CamelContext camelContext; - public ClusteredRoutePolicy(CamelClusterView clusterView) { + private ClusteredRoutePolicy(CamelClusterView clusterView) { this.clusterView = clusterView; this.leadershipEventListener = new CamelClusterLeadershipListener(); @@ -317,23 +317,34 @@ public class ClusteredRoutePolicy extends RoutePolicySupport implements CamelCon // Static helpers // **************************************************** - public static ClusteredRoutePolicy forNamespace(CamelContext camelContext, String namespace) throws Exception { - CamelClusterService cluster = camelContext.hasService(CamelClusterService.class); - if (cluster == null) { - throw new IllegalStateException("CamelCluster service not found"); - } + public static ClusteredRoutePolicy forNamespace(CamelContext camelContext, CamelClusterService.Selector selector, String namespace) throws Exception { + final CamelClusterService service = ClusterServiceHelper.lookupService(camelContext, selector).orElseThrow( + () -> new IllegalStateException("CamelCluster service not found") + ); - return forNamespace(cluster, namespace); + return forNamespace(service, namespace); } - public static ClusteredRoutePolicy forNamespace(CamelClusterService cluster, String namespace) throws Exception { - return forView(cluster.getView(namespace)); + public static ClusteredRoutePolicy forNamespace(CamelContext camelContext, String namespace) throws Exception { + return forNamespace(camelContext, ClusterServiceSelectors.DEFAULT_SELECTOR, namespace); + } + + public static ClusteredRoutePolicy forNamespace(CamelClusterService service, String namespace) throws Exception { + return forView(service.getView(namespace)); } public static ClusteredRoutePolicy forView(CamelClusterView view) throws Exception { + ClusteredRoutePolicy policy = new ClusteredRoutePolicy(view); policy.setCamelContext(view.getCamelContext()); + LOGGER.debug("ClusteredRoutePolicy {} is using ClusterService instance {} (id={}, type={})", + policy, + view.getClusterService(), + view.getClusterService().getId(), + view.getClusterService().getClass().getName() + ); + return policy; } } http://git-wip-us.apache.org/repos/asf/camel/blob/ab42e163/camel-core/src/main/java/org/apache/camel/impl/ha/ClusteredRoutePolicyFactory.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/ha/ClusteredRoutePolicyFactory.java b/camel-core/src/main/java/org/apache/camel/impl/ha/ClusteredRoutePolicyFactory.java index cd8782b..41efd67 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/ha/ClusteredRoutePolicyFactory.java +++ b/camel-core/src/main/java/org/apache/camel/impl/ha/ClusteredRoutePolicyFactory.java @@ -27,15 +27,32 @@ import org.apache.camel.util.ObjectHelper; public class ClusteredRoutePolicyFactory implements RoutePolicyFactory { private final String namespace; private final CamelClusterService clusterService; + private final CamelClusterService.Selector clusterServiceSelector; - public ClusteredRoutePolicyFactory(String viewName) { - this.namespace = ObjectHelper.notNull(viewName, "Cluster View Namespace"); + public ClusteredRoutePolicyFactory(String namespace) { + ObjectHelper.notNull(namespace, "Cluster View Namespace"); + + this.namespace = namespace; + this.clusterService = null; + this.clusterServiceSelector = ClusterServiceSelectors.DEFAULT_SELECTOR; + } + + public ClusteredRoutePolicyFactory(CamelClusterService.Selector selector, String namespace) { + ObjectHelper.notNull(namespace, "Cluster View Namespace"); + ObjectHelper.notNull(selector, "Cluster Service Selector"); + + this.namespace = namespace; this.clusterService = null; + this.clusterServiceSelector = selector; } public ClusteredRoutePolicyFactory(CamelClusterService clusterService, String viewName) { - this.namespace = ObjectHelper.notNull(viewName, "Cluster View Namespace"); - this.clusterService = ObjectHelper.notNull(clusterService, "Cluster Service"); + ObjectHelper.notNull(clusterService, "Cluster Service"); + ObjectHelper.notNull(viewName, "Cluster View Namespace"); + + this.clusterService = clusterService; + this.namespace = viewName; + this.clusterServiceSelector = null; } @Override @@ -43,7 +60,7 @@ public class ClusteredRoutePolicyFactory implements RoutePolicyFactory { try { return clusterService != null ? ClusteredRoutePolicy.forNamespace(clusterService, namespace) - : ClusteredRoutePolicy.forNamespace(camelContext, namespace); + : ClusteredRoutePolicy.forNamespace(camelContext, clusterServiceSelector, namespace); } catch (Exception e) { throw new RuntimeCamelException(e); } @@ -57,6 +74,10 @@ public class ClusteredRoutePolicyFactory implements RoutePolicyFactory { return new ClusteredRoutePolicyFactory(namespace); } + public static ClusteredRoutePolicyFactory forNamespace(CamelClusterService.Selector selector, String namespace) { + return new ClusteredRoutePolicyFactory(selector, namespace); + } + public static ClusteredRoutePolicyFactory forNamespace(CamelClusterService clusterService, String namespace) { return new ClusteredRoutePolicyFactory(clusterService, namespace); } http://git-wip-us.apache.org/repos/asf/camel/blob/ab42e163/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedClusterService.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedClusterService.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedClusterService.java index 7bb3197..3ccc7a7 100644 --- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedClusterService.java +++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedClusterService.java @@ -25,7 +25,7 @@ import org.apache.camel.ServiceStatus; import org.apache.camel.StatefulService; import org.apache.camel.api.management.mbean.ManagedClusterServiceMBean; import org.apache.camel.ha.CamelClusterService; -import org.apache.camel.ha.CamelClusterServiceHelper; +import org.apache.camel.impl.ha.ClusterServiceHelper; import org.apache.camel.spi.ManagementStrategy; public class ManagedClusterService implements ManagedClusterServiceMBean { @@ -84,14 +84,14 @@ public class ManagedClusterService implements ManagedClusterServiceMBean { @Override public Collection<String> getNamespaces() { - return CamelClusterServiceHelper.lookupService(context) + return ClusterServiceHelper.lookupService(context) .map(CamelClusterService::getNamespaces) .orElseGet(Collections::emptyList); } @Override public void startView(String namespace) throws Exception { - Optional<CamelClusterService> service = CamelClusterServiceHelper.lookupService(context); + Optional<CamelClusterService> service = ClusterServiceHelper.lookupService(context); if (service.isPresent()) { service.get().startView(namespace); } @@ -99,7 +99,7 @@ public class ManagedClusterService implements ManagedClusterServiceMBean { @Override public void stopView(String namespace) throws Exception { - Optional<CamelClusterService> service = CamelClusterServiceHelper.lookupService(context); + Optional<CamelClusterService> service = ClusterServiceHelper.lookupService(context); if (service.isPresent()) { service.get().stopView(namespace); } @@ -107,7 +107,7 @@ public class ManagedClusterService implements ManagedClusterServiceMBean { @Override public boolean isLeader(String namespace) { - return CamelClusterServiceHelper.lookupService(context) + return ClusterServiceHelper.lookupService(context) .map(s -> s.isLeader(namespace)) .orElse(false); } http://git-wip-us.apache.org/repos/asf/camel/blob/ab42e163/camel-core/src/test/java/org/apache/camel/impl/ha/CamelClusterServiceTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/impl/ha/CamelClusterServiceTest.java b/camel-core/src/test/java/org/apache/camel/impl/ha/CamelClusterServiceTest.java deleted file mode 100644 index df45ff0..0000000 --- a/camel-core/src/test/java/org/apache/camel/impl/ha/CamelClusterServiceTest.java +++ /dev/null @@ -1,155 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.impl.ha; - -import java.util.List; -import java.util.Optional; - -import org.apache.camel.ServiceStatus; -import org.apache.camel.ha.CamelClusterMember; -import org.apache.camel.ha.CamelClusterService; -import org.junit.Assert; -import org.junit.Test; - -public class CamelClusterServiceTest { - - @Test - public void testViewEquality() throws Exception { - TestClusterService service = new TestClusterService(); - TestClusterView view1 = service.getView("ns1").unwrap(TestClusterView.class); - TestClusterView view2 = service.getView("ns1").unwrap(TestClusterView.class); - TestClusterView view3 = service.getView("ns2").unwrap(TestClusterView.class); - - Assert.assertEquals(view1, view2); - Assert.assertNotEquals(view1, view3); - } - - @Test - public void testViewReferences() throws Exception { - TestClusterService service = new TestClusterService(); - service.start(); - - TestClusterView view1 = service.getView("ns1").unwrap(TestClusterView.class); - TestClusterView view2 = service.getView("ns1").unwrap(TestClusterView.class); - TestClusterView view3 = service.getView("ns2").unwrap(TestClusterView.class); - - Assert.assertEquals(ServiceStatus.Started, view1.getStatus()); - Assert.assertEquals(ServiceStatus.Started, view2.getStatus()); - Assert.assertEquals(ServiceStatus.Started, view3.getStatus()); - - service.releaseView(view1); - - Assert.assertEquals(ServiceStatus.Started, view1.getStatus()); - Assert.assertEquals(ServiceStatus.Started, view2.getStatus()); - Assert.assertEquals(ServiceStatus.Started, view3.getStatus()); - - service.releaseView(view2); - - Assert.assertEquals(ServiceStatus.Stopped, view1.getStatus()); - Assert.assertEquals(ServiceStatus.Stopped, view2.getStatus()); - Assert.assertEquals(ServiceStatus.Started, view3.getStatus()); - - service.releaseView(view3); - - TestClusterView newView1 = service.getView("ns1").unwrap(TestClusterView.class); - TestClusterView newView2 = service.getView("ns1").unwrap(TestClusterView.class); - - Assert.assertEquals(newView1, newView2); - Assert.assertEquals(view1, newView1); - Assert.assertEquals(view1, newView2); - - Assert.assertEquals(ServiceStatus.Started, newView1.getStatus()); - Assert.assertEquals(ServiceStatus.Started, newView2.getStatus()); - Assert.assertEquals(ServiceStatus.Stopped, view3.getStatus()); - - service.stop(); - - Assert.assertEquals(ServiceStatus.Stopped, view1.getStatus()); - Assert.assertEquals(ServiceStatus.Stopped, view2.getStatus()); - Assert.assertEquals(ServiceStatus.Stopped, view3.getStatus()); - Assert.assertEquals(ServiceStatus.Stopped, newView1.getStatus()); - Assert.assertEquals(ServiceStatus.Stopped, newView2.getStatus()); - } - - @Test - public void testViewForceOperations() throws Exception { - TestClusterService service = new TestClusterService(); - TestClusterView view = service.getView("ns1").unwrap(TestClusterView.class); - - Assert.assertEquals(ServiceStatus.Stopped, view.getStatus()); - - // This should not start the view as the service has not yet started. - service.startView(view.getNamespace()); - - Assert.assertEquals(ServiceStatus.Stopped, view.getStatus()); - - // This should start the view. - service.start(); - - Assert.assertEquals(ServiceStatus.Started, view.getStatus()); - - service.stopView(view.getNamespace()); - Assert.assertEquals(ServiceStatus.Stopped, view.getStatus()); - - service.startView(view.getNamespace()); - Assert.assertEquals(ServiceStatus.Started, view.getStatus()); - - service.releaseView(view); - Assert.assertEquals(ServiceStatus.Stopped, view.getStatus()); - } - - // ********************************* - // Helpers - // ********************************* - - private static class TestClusterView extends AbstractCamelClusterView { - - public TestClusterView(CamelClusterService cluster, String namespace) { - super(cluster, namespace); - } - - @Override - public Optional<CamelClusterMember> getMaster() { - return null; - } - - @Override - public CamelClusterMember getLocalMember() { - return null; - } - - @Override - public List<CamelClusterMember> getMembers() { - return null; - } - - @Override - protected void doStart() throws Exception { - } - - @Override - protected void doStop() throws Exception { - } - } - - private static class TestClusterService extends AbstractCamelClusterService<TestClusterView> { - @Override - protected TestClusterView createView(String namespace) throws Exception { - return new TestClusterView(this, namespace); - } - } -} http://git-wip-us.apache.org/repos/asf/camel/blob/ab42e163/camel-core/src/test/java/org/apache/camel/impl/ha/ClusterServiceSelectorTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/impl/ha/ClusterServiceSelectorTest.java b/camel-core/src/test/java/org/apache/camel/impl/ha/ClusterServiceSelectorTest.java new file mode 100644 index 0000000..ba4549f --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/impl/ha/ClusterServiceSelectorTest.java @@ -0,0 +1,404 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl.ha; + +import java.util.Collections; +import java.util.List; +import java.util.Optional; + +import org.apache.camel.CamelContext; +import org.apache.camel.component.file.ha.FileLockClusterService; +import org.apache.camel.ha.CamelClusterMember; +import org.apache.camel.ha.CamelClusterService; +import org.apache.camel.ha.CamelClusterView; +import org.apache.camel.impl.DefaultCamelContext; +import org.junit.Assert; +import org.junit.Test; + +import static org.apache.camel.impl.ha.ClusterServiceHelper.lookupService; +import static org.apache.camel.impl.ha.ClusterServiceHelper.mandatoryLookupService; + +public class ClusterServiceSelectorTest { + + @Test + public void testDefaultSelector() throws Exception { + CamelContext context = null; + + try { + DummyClusterService1 service1 = new DummyClusterService1(); + + context = new DefaultCamelContext(); + context.addService(service1); + + Optional<CamelClusterService> lookup = lookupService(context); + + Assert.assertTrue(lookup.isPresent()); + Assert.assertEquals(service1, lookup.get()); + } finally { + if (context != null) { + context.stop(); + } + } + } + + @Test + public void testDefaultSelectorFailure() throws Exception { + CamelContext context = null; + + try { + context = new DefaultCamelContext(); + context.addService(new DummyClusterService1()); + context.addService(new DummyClusterService2()); + + Optional<CamelClusterService> lookup = lookupService(context); + + Assert.assertFalse(lookup.isPresent()); + } finally { + if (context != null) { + context.stop(); + } + } + } + + @Test + public void testSelectSingle() throws Exception { + CamelContext context = null; + + try { + DummyClusterService1 service1 = new DummyClusterService1(); + + context = new DefaultCamelContext(); + context.addService(service1); + + CamelClusterService.Selector selector = ClusterServiceSelectors.single(); + Optional<CamelClusterService> lookup = lookupService(context, selector); + + Assert.assertTrue(lookup.isPresent()); + Assert.assertEquals(service1, lookup.get()); + } finally { + if (context != null) { + context.stop(); + } + } + } + + @Test + public void testSelectSingleFailure() throws Exception { + CamelContext context = null; + + try { + context = new DefaultCamelContext(); + context.addService(new DummyClusterService1()); + context.addService(new DummyClusterService2()); + + CamelClusterService.Selector selector = ClusterServiceSelectors.single(); + Optional<CamelClusterService> lookup = lookupService(context, selector); + + Assert.assertFalse(lookup.isPresent()); + } finally { + if (context != null) { + context.stop(); + } + } + } + + @Test + public void testSelectFirst() throws Exception { + CamelContext context = null; + + try { + context = new DefaultCamelContext(); + context.addService(new DummyClusterService1()); + context.addService(new DummyClusterService2()); + + CamelClusterService.Selector selector = ClusterServiceSelectors.first(); + Optional<CamelClusterService> lookup = lookupService(context, selector); + + Assert.assertTrue(lookup.isPresent()); + } finally { + if (context != null) { + context.stop(); + } + } + } + + @Test + public void testSelectByType() throws Exception { + CamelContext context = null; + + try { + context = new DefaultCamelContext(); + context.addService(new DummyClusterService1()); + context.addService(new DummyClusterService2()); + + Assert.assertTrue(lookupService(context, ClusterServiceSelectors.type(DummyClusterService1.class)).isPresent()); + Assert.assertTrue(lookupService(context, ClusterServiceSelectors.type(DummyClusterService2.class)).isPresent()); + Assert.assertFalse(lookupService(context, ClusterServiceSelectors.type(FileLockClusterService.class)).isPresent()); + + } finally { + if (context != null) { + context.stop(); + } + } + } + + @Test + public void testSelectByAttribute() throws Exception { + CamelContext context = null; + + try { + DummyClusterService1 service1 = new DummyClusterService1(); + service1.setAttribute("service.type", "zookeeper"); + + DummyClusterService2 service2 = new DummyClusterService2(); + service2.setAttribute("service.type", "file"); + + context = new DefaultCamelContext(); + context.addService(service1); + context.addService(service2); + + Optional<CamelClusterService> lookup; + + + lookup = lookupService(context, ClusterServiceSelectors.attribute("service.type", "zookeeper")); + Assert.assertTrue(lookup.isPresent()); + Assert.assertEquals(service1, lookup.get()); + + lookup = lookupService(context, ClusterServiceSelectors.attribute("service.type", "file")); + Assert.assertTrue(lookup.isPresent()); + Assert.assertEquals(service2, lookup.get()); + + lookup = lookupService(context, ClusterServiceSelectors.attribute("service.type", "consul")); + Assert.assertFalse(lookup.isPresent()); + + } finally { + if (context != null) { + context.stop(); + } + } + } + + @Test + public void testSelectByOrder() throws Exception { + CamelContext context = null; + + try { + DummyClusterService1 service1 = new DummyClusterService1(); + service1.setOrder(1); + + DummyClusterService2 service2 = new DummyClusterService2(); + service2.setOrder(0); + + context = new DefaultCamelContext(); + context.addService(service1); + context.addService(service2); + + CamelClusterService.Selector selector = ClusterServiceSelectors.order(); + Optional<CamelClusterService> lookup = lookupService(context, selector); + + Assert.assertTrue(lookup.isPresent()); + Assert.assertEquals(service2, lookup.get()); + + } finally { + if (context != null) { + context.stop(); + } + } + } + + @Test + public void testSelectByOrderFailure() throws Exception { + CamelContext context = null; + + try { + DummyClusterService1 service1 = new DummyClusterService1(); + service1.setOrder(1); + + DummyClusterService2 service2 = new DummyClusterService2(); + service2.setOrder(0); + + DummyClusterService2 service3 = new DummyClusterService2(); + service3.setOrder(0); + + context = new DefaultCamelContext(); + context.addService(service1); + context.addService(service2); + context.addService(service3); + + CamelClusterService.Selector selector = ClusterServiceSelectors.order(); + Optional<CamelClusterService> lookup = lookupService(context, selector); + + Assert.assertFalse(lookup.isPresent()); + } finally { + if (context != null) { + context.stop(); + } + } + } + + @Test + public void testMandatoryLookup() throws Exception { + CamelContext context = null; + + try { + DummyClusterService1 service1 = new DummyClusterService1(); + + context = new DefaultCamelContext(); + context.addService(service1); + + CamelClusterService.Selector selector = ClusterServiceSelectors.single(); + CamelClusterService lookup = mandatoryLookupService(context, selector); + + Assert.assertNotNull(lookup); + Assert.assertEquals(service1, lookup); + } finally { + if (context != null) { + context.stop(); + } + } + } + + @Test + public void testMandatoryLookupWithoutSelector() throws Exception { + CamelContext context = null; + + try { + DummyClusterService1 service1 = new DummyClusterService1(); + + context = new DefaultCamelContext(); + context.addService(service1); + + CamelClusterService lookup = mandatoryLookupService(context); + + Assert.assertNotNull(lookup); + Assert.assertEquals(service1, lookup); + } finally { + if (context != null) { + context.stop(); + } + } + } + + @Test(expected = IllegalStateException.class) + public void testMandatoryLookupFailure() throws Exception { + CamelContext context = null; + + try { + context = new DefaultCamelContext(); + + mandatoryLookupService(context, ClusterServiceSelectors.single()); + } finally { + if (context != null) { + context.stop(); + } + } + } + + @Test(expected = IllegalStateException.class) + public void testMandatoryLookupFailureWithoutSelector() throws Exception { + CamelContext context = null; + + try { + context = new DefaultCamelContext(); + + mandatoryLookupService(context); + } finally { + if (context != null) { + context.stop(); + } + } + } + + // ************************************** + // Helpers + // ************************************** + + private final class DummyClusterService1 extends AbstractCamelClusterService { + public DummyClusterService1() { + } + + @Override + protected CamelClusterView createView(String namespace) throws Exception { + return new DummyClusterServiceView(this, namespace); + } + } + + private final class DummyClusterService2 extends AbstractCamelClusterService { + public DummyClusterService2() { + } + + @Override + protected CamelClusterView createView(String namespace) throws Exception { + return new DummyClusterServiceView(this, namespace); + } + } + + private final class DummyClusterServiceView extends AbstractCamelClusterView { + + public DummyClusterServiceView(CamelClusterService cluster, String namespace) { + super(cluster, namespace); + } + + @Override + public Optional<CamelClusterMember> getMaster() { + return Optional.empty(); + } + + @Override + public CamelClusterMember getLocalMember() { + return new DummyClusterServiceMember(false, true); + } + + @Override + public List<CamelClusterMember> getMembers() { + return Collections.emptyList(); + } + + @Override + protected void doStart() throws Exception { + } + + @Override + protected void doStop() throws Exception { + } + + private final class DummyClusterServiceMember implements CamelClusterMember { + private final boolean leader; + private final boolean local; + + public DummyClusterServiceMember(boolean leader, boolean local) { + this.leader = leader; + this.local = local; + } + + @Override + public boolean isLeader() { + return leader; + } + + @Override + public boolean isLocal() { + return local; + } + + @Override + public String getId() { + return getClusterService().getId(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/ab42e163/camel-core/src/test/java/org/apache/camel/impl/ha/ClusterServiceViewTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/impl/ha/ClusterServiceViewTest.java b/camel-core/src/test/java/org/apache/camel/impl/ha/ClusterServiceViewTest.java new file mode 100644 index 0000000..cc6090e --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/impl/ha/ClusterServiceViewTest.java @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl.ha; + +import java.util.List; +import java.util.Optional; + +import org.apache.camel.ServiceStatus; +import org.apache.camel.ha.CamelClusterMember; +import org.apache.camel.ha.CamelClusterService; +import org.junit.Assert; +import org.junit.Test; + +public class ClusterServiceViewTest { + + @Test + public void testViewEquality() throws Exception { + TestClusterService service = new TestClusterService(); + TestClusterView view1 = service.getView("ns1").unwrap(TestClusterView.class); + TestClusterView view2 = service.getView("ns1").unwrap(TestClusterView.class); + TestClusterView view3 = service.getView("ns2").unwrap(TestClusterView.class); + + Assert.assertEquals(view1, view2); + Assert.assertNotEquals(view1, view3); + } + + @Test + public void testViewReferences() throws Exception { + TestClusterService service = new TestClusterService(); + service.start(); + + TestClusterView view1 = service.getView("ns1").unwrap(TestClusterView.class); + TestClusterView view2 = service.getView("ns1").unwrap(TestClusterView.class); + TestClusterView view3 = service.getView("ns2").unwrap(TestClusterView.class); + + Assert.assertEquals(ServiceStatus.Started, view1.getStatus()); + Assert.assertEquals(ServiceStatus.Started, view2.getStatus()); + Assert.assertEquals(ServiceStatus.Started, view3.getStatus()); + + service.releaseView(view1); + + Assert.assertEquals(ServiceStatus.Started, view1.getStatus()); + Assert.assertEquals(ServiceStatus.Started, view2.getStatus()); + Assert.assertEquals(ServiceStatus.Started, view3.getStatus()); + + service.releaseView(view2); + + Assert.assertEquals(ServiceStatus.Stopped, view1.getStatus()); + Assert.assertEquals(ServiceStatus.Stopped, view2.getStatus()); + Assert.assertEquals(ServiceStatus.Started, view3.getStatus()); + + service.releaseView(view3); + + TestClusterView newView1 = service.getView("ns1").unwrap(TestClusterView.class); + TestClusterView newView2 = service.getView("ns1").unwrap(TestClusterView.class); + + Assert.assertEquals(newView1, newView2); + Assert.assertEquals(view1, newView1); + Assert.assertEquals(view1, newView2); + + Assert.assertEquals(ServiceStatus.Started, newView1.getStatus()); + Assert.assertEquals(ServiceStatus.Started, newView2.getStatus()); + Assert.assertEquals(ServiceStatus.Stopped, view3.getStatus()); + + service.stop(); + + Assert.assertEquals(ServiceStatus.Stopped, view1.getStatus()); + Assert.assertEquals(ServiceStatus.Stopped, view2.getStatus()); + Assert.assertEquals(ServiceStatus.Stopped, view3.getStatus()); + Assert.assertEquals(ServiceStatus.Stopped, newView1.getStatus()); + Assert.assertEquals(ServiceStatus.Stopped, newView2.getStatus()); + } + + @Test + public void testViewForceOperations() throws Exception { + TestClusterService service = new TestClusterService(); + TestClusterView view = service.getView("ns1").unwrap(TestClusterView.class); + + Assert.assertEquals(ServiceStatus.Stopped, view.getStatus()); + + // This should not start the view as the service has not yet started. + service.startView(view.getNamespace()); + + Assert.assertEquals(ServiceStatus.Stopped, view.getStatus()); + + // This should start the view. + service.start(); + + Assert.assertEquals(ServiceStatus.Started, view.getStatus()); + + service.stopView(view.getNamespace()); + Assert.assertEquals(ServiceStatus.Stopped, view.getStatus()); + + service.startView(view.getNamespace()); + Assert.assertEquals(ServiceStatus.Started, view.getStatus()); + + service.releaseView(view); + Assert.assertEquals(ServiceStatus.Stopped, view.getStatus()); + } + + // ********************************* + // Helpers + // ********************************* + + private static class TestClusterView extends AbstractCamelClusterView { + + public TestClusterView(CamelClusterService cluster, String namespace) { + super(cluster, namespace); + } + + @Override + public Optional<CamelClusterMember> getMaster() { + return null; + } + + @Override + public CamelClusterMember getLocalMember() { + return null; + } + + @Override + public List<CamelClusterMember> getMembers() { + return null; + } + + @Override + protected void doStart() throws Exception { + } + + @Override + protected void doStop() throws Exception { + } + } + + private static class TestClusterService extends AbstractCamelClusterService<TestClusterView> { + @Override + protected TestClusterView createView(String namespace) throws Exception { + return new TestClusterView(this, namespace); + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/ab42e163/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterClientService.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterClientService.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterClientService.java index 6df320f..ab844e7 100644 --- a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterClientService.java +++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterClientService.java @@ -106,9 +106,6 @@ public final class AtomixClusterClientService extends AbstractCamelClusterServic @Override protected void doStart() throws Exception { - // instantiate a new atomix replica - getOrCreateClient(); - super.doStart(); } http://git-wip-us.apache.org/repos/asf/camel/blob/ab42e163/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterService.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterService.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterService.java index 1937a82..17c90e3 100644 --- a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterService.java +++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterService.java @@ -136,9 +136,6 @@ public final class AtomixClusterService extends AbstractCamelClusterService<Atom @Override protected void doStart() throws Exception { - // instantiate a new atomix replica - getOrCreateReplica(); - super.doStart(); } http://git-wip-us.apache.org/repos/asf/camel/blob/ab42e163/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java ---------------------------------------------------------------------- diff --git a/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java b/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java index 23f1e29..ae0715e 100644 --- a/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java +++ b/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java @@ -329,10 +329,13 @@ public abstract class AbstractCamelContextFactoryBean<T extends ModelCamelContex } } // cluster service - CamelClusterService clusterService = getBeanForType(CamelClusterService.class); - if (clusterService != null) { - LOG.info("Using CamelClusterService: " + clusterService); - getContext().addService(clusterService); + Map<String, CamelClusterService> clusterServices = getContext().getRegistry().findByTypeWithName(CamelClusterService.class); + if (clusterServices != null && !clusterServices.isEmpty()) { + for (Entry<String, CamelClusterService> entry : clusterServices.entrySet()) { + CamelClusterService service = entry.getValue(); + LOG.info("Using CamelClusterService with id: {} and implementation: {}", service.getId(), service); + getContext().addService(service); + } } // add route policy factories Map<String, RoutePolicyFactory> routePolicyFactories = getContext().getRegistry().findByTypeWithName(RoutePolicyFactory.class); http://git-wip-us.apache.org/repos/asf/camel/blob/ab42e163/components/camel-spring-boot/src/main/java/org/apache/camel/spring/boot/CamelAutoConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-spring-boot/src/main/java/org/apache/camel/spring/boot/CamelAutoConfiguration.java b/components/camel-spring-boot/src/main/java/org/apache/camel/spring/boot/CamelAutoConfiguration.java index fc6cde6..009f3fa 100644 --- a/components/camel-spring-boot/src/main/java/org/apache/camel/spring/boot/CamelAutoConfiguration.java +++ b/components/camel-spring-boot/src/main/java/org/apache/camel/spring/boot/CamelAutoConfiguration.java @@ -459,10 +459,13 @@ public class CamelAutoConfiguration { } } // cluster service - CamelClusterService clusterService = getSingleBeanOfType(applicationContext, CamelClusterService.class); - if (clusterService != null) { - LOG.info("Using CamelClusterService: " + clusterService); - camelContext.addService(clusterService); + Map<String, CamelClusterService> clusterServices = applicationContext.getBeansOfType(CamelClusterService.class); + if (clusterServices != null && !clusterServices.isEmpty()) { + for (Map.Entry<String, CamelClusterService> entry : clusterServices.entrySet()) { + CamelClusterService service = entry.getValue(); + LOG.info("Using CamelClusterService with id: {} and implementation: {}", service.getId(), service); + camelContext.addService(service); + } } // add route policy factories Map<String, RoutePolicyFactory> routePolicyFactories = applicationContext.getBeansOfType(RoutePolicyFactory.class); http://git-wip-us.apache.org/repos/asf/camel/blob/ab42e163/platforms/spring-boot/components-starter/camel-atomix-starter/src/main/java/org/apache/camel/component/atomix/ha/springboot/AtomixClusterServiceAutoConfiguration.java ---------------------------------------------------------------------- diff --git a/platforms/spring-boot/components-starter/camel-atomix-starter/src/main/java/org/apache/camel/component/atomix/ha/springboot/AtomixClusterServiceAutoConfiguration.java b/platforms/spring-boot/components-starter/camel-atomix-starter/src/main/java/org/apache/camel/component/atomix/ha/springboot/AtomixClusterServiceAutoConfiguration.java index 286918f..c065bd6 100644 --- a/platforms/spring-boot/components-starter/camel-atomix-starter/src/main/java/org/apache/camel/component/atomix/ha/springboot/AtomixClusterServiceAutoConfiguration.java +++ b/platforms/spring-boot/components-starter/camel-atomix-starter/src/main/java/org/apache/camel/component/atomix/ha/springboot/AtomixClusterServiceAutoConfiguration.java @@ -30,7 +30,6 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.boot.autoconfigure.AutoConfigureBefore; import org.springframework.boot.autoconfigure.condition.AllNestedConditions; -import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; @@ -46,10 +45,9 @@ public class AtomixClusterServiceAutoConfiguration { @Autowired private AtomixClusterServiceConfiguration configuration; - @Bean(initMethod = "start", destroyMethod = "stop") + @Bean(name = "atomix-cluster-service") @Scope(ConfigurableBeanFactory.SCOPE_SINGLETON) @ConditionalOnProperty(prefix = "camel.component.atomix.cluster.service", name = "mode", havingValue = "node") - @ConditionalOnMissingBean public CamelClusterService atomixClusterService() { AtomixClusterService service = new AtomixClusterService(); service.setNodes(configuration.getNodes().stream().map(Address::new).collect(Collectors.toList())); @@ -60,20 +58,23 @@ public class AtomixClusterServiceAutoConfiguration { ObjectHelper.ifNotEmpty(configuration.getStoragePath(), service::setStoragePath); ObjectHelper.ifNotEmpty(configuration.getStorageLevel(), service::setStorageLevel); ObjectHelper.ifNotEmpty(configuration.getConfigurationUri(), service::setConfigurationUri); + ObjectHelper.ifNotEmpty(configuration.getAttributes(), service::setAttributes); + ObjectHelper.ifNotEmpty(configuration.getOrder(), service::setOrder); return service; } - @Bean(initMethod = "start", destroyMethod = "stop") + @Bean(name = "atomix-cluster-client-service") @Scope(ConfigurableBeanFactory.SCOPE_SINGLETON) @ConditionalOnProperty(prefix = "camel.component.atomix.cluster.service", name = "mode", havingValue = "client") - @ConditionalOnMissingBean public CamelClusterService atomixClusterClientService() { AtomixClusterClientService service = new AtomixClusterClientService(); service.setNodes(configuration.getNodes().stream().map(Address::new).collect(Collectors.toList())); ObjectHelper.ifNotEmpty(configuration.getId(), service::setId); ObjectHelper.ifNotEmpty(configuration.getConfigurationUri(), service::setConfigurationUri); + ObjectHelper.ifNotEmpty(configuration.getAttributes(), service::setAttributes); + ObjectHelper.ifNotEmpty(configuration.getOrder(), service::setOrder); return service; } http://git-wip-us.apache.org/repos/asf/camel/blob/ab42e163/platforms/spring-boot/components-starter/camel-atomix-starter/src/main/java/org/apache/camel/component/atomix/ha/springboot/AtomixClusterServiceConfiguration.java ---------------------------------------------------------------------- diff --git a/platforms/spring-boot/components-starter/camel-atomix-starter/src/main/java/org/apache/camel/component/atomix/ha/springboot/AtomixClusterServiceConfiguration.java b/platforms/spring-boot/components-starter/camel-atomix-starter/src/main/java/org/apache/camel/component/atomix/ha/springboot/AtomixClusterServiceConfiguration.java index 8317d56..fb9bcba 100644 --- a/platforms/spring-boot/components-starter/camel-atomix-starter/src/main/java/org/apache/camel/component/atomix/ha/springboot/AtomixClusterServiceConfiguration.java +++ b/platforms/spring-boot/components-starter/camel-atomix-starter/src/main/java/org/apache/camel/component/atomix/ha/springboot/AtomixClusterServiceConfiguration.java @@ -17,6 +17,7 @@ package org.apache.camel.component.atomix.ha.springboot; import java.util.HashSet; +import java.util.Map; import java.util.Set; import io.atomix.copycat.server.storage.StorageLevel; @@ -73,6 +74,16 @@ public class AtomixClusterServiceConfiguration { */ private String configurationUri; + /** + * Custom service attributes. + */ + private Map<String, Object> attributes; + + /** + * Service lookup order/priority. + */ + private Integer order; + // ********************************* // Properties // ********************************* @@ -148,4 +159,20 @@ public class AtomixClusterServiceConfiguration { public void setConfigurationUri(String configurationUri) { this.configurationUri = configurationUri; } + + public Map<String, Object> getAttributes() { + return attributes; + } + + public void setAttributes(Map<String, Object> attributes) { + this.attributes = attributes; + } + + public Integer getOrder() { + return order; + } + + public void setOrder(Integer order) { + this.order = order; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/ab42e163/platforms/spring-boot/components-starter/camel-consul-starter/src/main/java/org/apache/camel/component/consul/springboot/ha/ConsulClusterServiceAutoConfiguration.java ---------------------------------------------------------------------- diff --git a/platforms/spring-boot/components-starter/camel-consul-starter/src/main/java/org/apache/camel/component/consul/springboot/ha/ConsulClusterServiceAutoConfiguration.java b/platforms/spring-boot/components-starter/camel-consul-starter/src/main/java/org/apache/camel/component/consul/springboot/ha/ConsulClusterServiceAutoConfiguration.java index 082bb7b..526e6c3 100644 --- a/platforms/spring-boot/components-starter/camel-consul-starter/src/main/java/org/apache/camel/component/consul/springboot/ha/ConsulClusterServiceAutoConfiguration.java +++ b/platforms/spring-boot/components-starter/camel-consul-starter/src/main/java/org/apache/camel/component/consul/springboot/ha/ConsulClusterServiceAutoConfiguration.java @@ -24,7 +24,6 @@ import org.apache.camel.util.IntrospectionSupport; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.boot.autoconfigure.AutoConfigureBefore; -import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; @@ -39,9 +38,8 @@ public class ConsulClusterServiceAutoConfiguration { @Autowired private ConsulClusterServiceConfiguration configuration; - @Bean(initMethod = "start", destroyMethod = "stop") + @Bean(name = "consul-cluster-service") @Scope(ConfigurableBeanFactory.SCOPE_SINGLETON) - @ConditionalOnMissingBean public CamelClusterService consulClusterService() throws Exception { ConsulClusterService service = new ConsulClusterService(); http://git-wip-us.apache.org/repos/asf/camel/blob/ab42e163/platforms/spring-boot/components-starter/camel-consul-starter/src/main/java/org/apache/camel/component/consul/springboot/ha/ConsulClusterServiceConfiguration.java ---------------------------------------------------------------------- diff --git a/platforms/spring-boot/components-starter/camel-consul-starter/src/main/java/org/apache/camel/component/consul/springboot/ha/ConsulClusterServiceConfiguration.java b/platforms/spring-boot/components-starter/camel-consul-starter/src/main/java/org/apache/camel/component/consul/springboot/ha/ConsulClusterServiceConfiguration.java index bc619c2..205483b 100644 --- a/platforms/spring-boot/components-starter/camel-consul-starter/src/main/java/org/apache/camel/component/consul/springboot/ha/ConsulClusterServiceConfiguration.java +++ b/platforms/spring-boot/components-starter/camel-consul-starter/src/main/java/org/apache/camel/component/consul/springboot/ha/ConsulClusterServiceConfiguration.java @@ -16,6 +16,8 @@ */ package org.apache.camel.component.consul.springboot.ha; +import java.util.Map; + import org.apache.camel.component.consul.ha.ConsulClusterConfiguration; import org.springframework.boot.context.properties.ConfigurationProperties; @@ -31,6 +33,16 @@ public class ConsulClusterServiceConfiguration extends ConsulClusterConfiguratio */ private String id; + /** + * Custom service attributes. + */ + private Map<String, Object> attributes; + + /** + * Service lookup order/priority. + */ + private Integer order; + public boolean isEnabled() { return enabled; } @@ -46,4 +58,20 @@ public class ConsulClusterServiceConfiguration extends ConsulClusterConfiguratio public void setId(String id) { this.id = id; } + + public Map<String, Object> getAttributes() { + return attributes; + } + + public void setAttributes(Map<String, Object> attributes) { + this.attributes = attributes; + } + + public Integer getOrder() { + return order; + } + + public void setOrder(Integer order) { + this.order = order; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/ab42e163/platforms/spring-boot/components-starter/camel-core-starter/src/main/java/org/apache/camel/component/file/springboot/ha/FileLockClusterServiceAutoConfiguration.java ---------------------------------------------------------------------- diff --git a/platforms/spring-boot/components-starter/camel-core-starter/src/main/java/org/apache/camel/component/file/springboot/ha/FileLockClusterServiceAutoConfiguration.java b/platforms/spring-boot/components-starter/camel-core-starter/src/main/java/org/apache/camel/component/file/springboot/ha/FileLockClusterServiceAutoConfiguration.java index 689127e..988db9d 100644 --- a/platforms/spring-boot/components-starter/camel-core-starter/src/main/java/org/apache/camel/component/file/springboot/ha/FileLockClusterServiceAutoConfiguration.java +++ b/platforms/spring-boot/components-starter/camel-core-starter/src/main/java/org/apache/camel/component/file/springboot/ha/FileLockClusterServiceAutoConfiguration.java @@ -28,7 +28,6 @@ import org.apache.camel.spring.boot.ha.ClusteredRouteControllerAutoConfiguration import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.boot.autoconfigure.AutoConfigureBefore; -import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; @@ -43,9 +42,8 @@ public class FileLockClusterServiceAutoConfiguration { @Autowired private FileLockClusterServiceConfiguration configuration; - @Bean(initMethod = "start", destroyMethod = "stop") + @Bean(name = "file-lock-cluster-service") @Scope(ConfigurableBeanFactory.SCOPE_SINGLETON) - @ConditionalOnMissingBean public CamelClusterService consulClusterService() throws Exception { FileLockClusterService service = new FileLockClusterService(); @@ -53,6 +51,10 @@ public class FileLockClusterServiceAutoConfiguration { .ifPresent(service::setId); Optional.ofNullable(configuration.getRoot()) .ifPresent(service::setRoot); + Optional.ofNullable(configuration.getOrder()) + .ifPresent(service::setOrder); + Optional.ofNullable(configuration.getAttributes()) + .ifPresent(service::setAttributes); Optional.ofNullable(configuration.getAcquireLockDelay()) .map(TimePatternConverter::toMilliSeconds) .ifPresent(v -> service.setAcquireLockDelay(v, TimeUnit.MILLISECONDS)); http://git-wip-us.apache.org/repos/asf/camel/blob/ab42e163/platforms/spring-boot/components-starter/camel-core-starter/src/main/java/org/apache/camel/component/file/springboot/ha/FileLockClusterServiceConfiguration.java ---------------------------------------------------------------------- diff --git a/platforms/spring-boot/components-starter/camel-core-starter/src/main/java/org/apache/camel/component/file/springboot/ha/FileLockClusterServiceConfiguration.java b/platforms/spring-boot/components-starter/camel-core-starter/src/main/java/org/apache/camel/component/file/springboot/ha/FileLockClusterServiceConfiguration.java index 3ef175e..5a13324 100644 --- a/platforms/spring-boot/components-starter/camel-core-starter/src/main/java/org/apache/camel/component/file/springboot/ha/FileLockClusterServiceConfiguration.java +++ b/platforms/spring-boot/components-starter/camel-core-starter/src/main/java/org/apache/camel/component/file/springboot/ha/FileLockClusterServiceConfiguration.java @@ -16,6 +16,8 @@ */ package org.apache.camel.component.file.springboot.ha; +import java.util.Map; + import org.springframework.boot.context.properties.ConfigurationProperties; @ConfigurationProperties(prefix = "camel.component.file.cluster.service") @@ -45,6 +47,16 @@ public class FileLockClusterServiceConfiguration { */ private String acquireLockInterval; + /** + * Custom service attributes. + */ + private Map<String, Object> attributes; + + /** + * Service lookup order/priority. + */ + private Integer order; + public boolean isEnabled() { return enabled; } @@ -84,4 +96,20 @@ public class FileLockClusterServiceConfiguration { public void setAcquireLockInterval(String acquireLockInterval) { this.acquireLockInterval = acquireLockInterval; } + + public Map<String, Object> getAttributes() { + return attributes; + } + + public void setAttributes(Map<String, Object> attributes) { + this.attributes = attributes; + } + + public Integer getOrder() { + return order; + } + + public void setOrder(Integer order) { + this.order = order; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/ab42e163/platforms/spring-boot/components-starter/camel-kubernetes-starter/src/main/java/org/apache/camel/component/kubernetes/springboot/ha/KubernetesClusterServiceAutoConfiguration.java ---------------------------------------------------------------------- diff --git a/platforms/spring-boot/components-starter/camel-kubernetes-starter/src/main/java/org/apache/camel/component/kubernetes/springboot/ha/KubernetesClusterServiceAutoConfiguration.java b/platforms/spring-boot/components-starter/camel-kubernetes-starter/src/main/java/org/apache/camel/component/kubernetes/springboot/ha/KubernetesClusterServiceAutoConfiguration.java index f55b84e..6e7f444 100644 --- a/platforms/spring-boot/components-starter/camel-kubernetes-starter/src/main/java/org/apache/camel/component/kubernetes/springboot/ha/KubernetesClusterServiceAutoConfiguration.java +++ b/platforms/spring-boot/components-starter/camel-kubernetes-starter/src/main/java/org/apache/camel/component/kubernetes/springboot/ha/KubernetesClusterServiceAutoConfiguration.java @@ -24,7 +24,6 @@ import org.apache.camel.util.IntrospectionSupport; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.boot.autoconfigure.AutoConfigureBefore; -import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; @@ -39,9 +38,8 @@ public class KubernetesClusterServiceAutoConfiguration { @Autowired private KubernetesClusterServiceConfiguration configuration; - @Bean(initMethod = "start", destroyMethod = "stop") + @Bean(name = "kubernetes-cluster-service") @Scope(ConfigurableBeanFactory.SCOPE_SINGLETON) - @ConditionalOnMissingBean public CamelClusterService kubernetesClusterService() throws Exception { KubernetesClusterService service = new KubernetesClusterService(); http://git-wip-us.apache.org/repos/asf/camel/blob/ab42e163/platforms/spring-boot/components-starter/camel-kubernetes-starter/src/main/java/org/apache/camel/component/kubernetes/springboot/ha/KubernetesClusterServiceConfiguration.java ---------------------------------------------------------------------- diff --git a/platforms/spring-boot/components-starter/camel-kubernetes-starter/src/main/java/org/apache/camel/component/kubernetes/springboot/ha/KubernetesClusterServiceConfiguration.java b/platforms/spring-boot/components-starter/camel-kubernetes-starter/src/main/java/org/apache/camel/component/kubernetes/springboot/ha/KubernetesClusterServiceConfiguration.java index 69580b7..42745a3 100644 --- a/platforms/spring-boot/components-starter/camel-kubernetes-starter/src/main/java/org/apache/camel/component/kubernetes/springboot/ha/KubernetesClusterServiceConfiguration.java +++ b/platforms/spring-boot/components-starter/camel-kubernetes-starter/src/main/java/org/apache/camel/component/kubernetes/springboot/ha/KubernetesClusterServiceConfiguration.java @@ -84,6 +84,16 @@ public class KubernetesClusterServiceConfiguration { */ private Long retryPeriodMillis; + /** + * Custom service attributes. + */ + private Map<String, Object> attributes; + + /** + * Service lookup order/priority. + */ + private Integer order; + public boolean isEnabled() { return enabled; } @@ -180,4 +190,19 @@ public class KubernetesClusterServiceConfiguration { this.retryPeriodMillis = retryPeriodMillis; } + public Map<String, Object> getAttributes() { + return attributes; + } + + public void setAttributes(Map<String, Object> attributes) { + this.attributes = attributes; + } + + public Integer getOrder() { + return order; + } + + public void setOrder(Integer order) { + this.order = order; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/ab42e163/platforms/spring-boot/components-starter/camel-zookeeper-starter/src/main/java/org/apache/camel/component/zookeeper/ha/springboot/ZooKeeperClusterServiceAutoConfiguration.java ---------------------------------------------------------------------- diff --git a/platforms/spring-boot/components-starter/camel-zookeeper-starter/src/main/java/org/apache/camel/component/zookeeper/ha/springboot/ZooKeeperClusterServiceAutoConfiguration.java b/platforms/spring-boot/components-starter/camel-zookeeper-starter/src/main/java/org/apache/camel/component/zookeeper/ha/springboot/ZooKeeperClusterServiceAutoConfiguration.java index de80367..f41656a 100644 --- a/platforms/spring-boot/components-starter/camel-zookeeper-starter/src/main/java/org/apache/camel/component/zookeeper/ha/springboot/ZooKeeperClusterServiceAutoConfiguration.java +++ b/platforms/spring-boot/components-starter/camel-zookeeper-starter/src/main/java/org/apache/camel/component/zookeeper/ha/springboot/ZooKeeperClusterServiceAutoConfiguration.java @@ -24,7 +24,6 @@ import org.apache.camel.util.IntrospectionSupport; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.boot.autoconfigure.AutoConfigureBefore; -import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; @@ -39,9 +38,8 @@ public class ZooKeeperClusterServiceAutoConfiguration { @Autowired private ZooKeeperClusterServiceConfiguration configuration; - @Bean(initMethod = "start", destroyMethod = "stop") + @Bean(name = "zookeeper-cluster-service") @Scope(ConfigurableBeanFactory.SCOPE_SINGLETON) - @ConditionalOnMissingBean public CamelClusterService zookeeperClusterService() throws Exception { ZooKeeperClusterService service = new ZooKeeperClusterService(); http://git-wip-us.apache.org/repos/asf/camel/blob/ab42e163/platforms/spring-boot/components-starter/camel-zookeeper-starter/src/main/java/org/apache/camel/component/zookeeper/ha/springboot/ZooKeeperClusterServiceConfiguration.java ---------------------------------------------------------------------- diff --git a/platforms/spring-boot/components-starter/camel-zookeeper-starter/src/main/java/org/apache/camel/component/zookeeper/ha/springboot/ZooKeeperClusterServiceConfiguration.java b/platforms/spring-boot/components-starter/camel-zookeeper-starter/src/main/java/org/apache/camel/component/zookeeper/ha/springboot/ZooKeeperClusterServiceConfiguration.java index cb66058..e9d4265 100644 --- a/platforms/spring-boot/components-starter/camel-zookeeper-starter/src/main/java/org/apache/camel/component/zookeeper/ha/springboot/ZooKeeperClusterServiceConfiguration.java +++ b/platforms/spring-boot/components-starter/camel-zookeeper-starter/src/main/java/org/apache/camel/component/zookeeper/ha/springboot/ZooKeeperClusterServiceConfiguration.java @@ -16,6 +16,8 @@ */ package org.apache.camel.component.zookeeper.ha.springboot; +import java.util.Map; + import org.apache.camel.component.zookeeper.ZooKeeperCuratorConfiguration; import org.springframework.boot.context.properties.ConfigurationProperties; @@ -31,6 +33,16 @@ public class ZooKeeperClusterServiceConfiguration extends ZooKeeperCuratorConfig */ private String id; + /** + * Custom service attributes. + */ + private Map<String, Object> attributes; + + /** + * Service lookup order/priority. + */ + private Integer order; + public boolean isEnabled() { return enabled; } @@ -46,4 +58,20 @@ public class ZooKeeperClusterServiceConfiguration extends ZooKeeperCuratorConfig public void setId(String id) { this.id = id; } + + public Map<String, Object> getAttributes() { + return attributes; + } + + public void setAttributes(Map<String, Object> attributes) { + this.attributes = attributes; + } + + public Integer getOrder() { + return order; + } + + public void setOrder(Integer order) { + this.order = order; + } }