Repository: camel Updated Branches: refs/heads/master 052fcc601 -> 0a487f8af
CAMEL-10862: camel-consul - ConsultRoutePolicy - Allow to configure host port easier Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/0a487f8a Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/0a487f8a Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/0a487f8a Branch: refs/heads/master Commit: 0a487f8af13e0120e275640998e22701b945b5da Parents: 052fcc6 Author: lburgazzoli <lburgazz...@gmail.com> Authored: Mon Feb 20 10:21:56 2017 +0100 Committer: lburgazzoli <lburgazz...@gmail.com> Committed: Mon Feb 20 10:22:15 2017 +0100 ---------------------------------------------------------------------- .../camel/component/consul/ConsulConstants.java | 4 + .../consul/policy/ConsulRoutePolicy.java | 97 ++++++++++++-------- .../consul/policy/ConsulRoutePolicyMain.java | 6 +- 3 files changed, 69 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/0a487f8a/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulConstants.java ---------------------------------------------------------------------- diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulConstants.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulConstants.java index 1265fa5..fd8ea39 100644 --- a/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulConstants.java +++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulConstants.java @@ -16,7 +16,11 @@ */ package org.apache.camel.component.consul; +import com.orbitz.consul.Consul; + public interface ConsulConstants { + String CONSUL_DEFAULT_URL = String.format("http://%s:%d", Consul.DEFAULT_HTTP_HOST, Consul.DEFAULT_HTTP_PORT); + // Service Call EIP String CONSUL_SERVER_IP = "CamelConsulServerIp"; String CONSUL_SERVER_PORT = "CamelConsulServerPort"; http://git-wip-us.apache.org/repos/asf/camel/blob/0a487f8a/components/camel-consul/src/main/java/org/apache/camel/component/consul/policy/ConsulRoutePolicy.java ---------------------------------------------------------------------- diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/policy/ConsulRoutePolicy.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/policy/ConsulRoutePolicy.java index 17c9c43..e7397a8 100644 --- a/components/camel-consul/src/main/java/org/apache/camel/component/consul/policy/ConsulRoutePolicy.java +++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/policy/ConsulRoutePolicy.java @@ -37,53 +37,49 @@ import org.apache.camel.CamelContextAware; import org.apache.camel.Route; import org.apache.camel.api.management.ManagedAttribute; import org.apache.camel.api.management.ManagedResource; +import org.apache.camel.component.consul.ConsulConfiguration; +import org.apache.camel.component.consul.ConsulConstants; import org.apache.camel.support.RoutePolicySupport; import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @ManagedResource(description = "Route policy using Consul as clustered lock") -public class ConsulRoutePolicy extends RoutePolicySupport implements CamelContextAware { +public final class ConsulRoutePolicy extends RoutePolicySupport implements CamelContextAware { private static final Logger LOGGER = LoggerFactory.getLogger(ConsulRoutePolicy.class); - private final Object lock; - private final Consul consul; - private final SessionClient sessionClient; - private final KeyValueClient keyValueClient; - private final AtomicBoolean leader; - private final Set<Route> suspendedRoutes; - private final AtomicReference<BigInteger> index; + private final Object lock = new Object(); + private final AtomicBoolean leader = new AtomicBoolean(false); + private final Set<Route> suspendedRoutes = new HashSet<>(); + private final AtomicReference<BigInteger> index = new AtomicReference<>(BigInteger.valueOf(0)); private Route route; private CamelContext camelContext; private String serviceName; private String servicePath; - private int ttl; - private int lockDelay; private ExecutorService executorService; - private boolean shouldStopConsumer; + + private int ttl = 60; + private int lockDelay = 10; + private boolean shouldStopConsumer = true; + private String consulUrl = ConsulConstants.CONSUL_DEFAULT_URL; + + private Consul consul; + private SessionClient sessionClient; + private KeyValueClient keyValueClient; private String sessionId; public ConsulRoutePolicy() { - this(Consul.builder().build()); } - public ConsulRoutePolicy(Consul consul) { - this.consul = consul; - this.sessionClient = consul.sessionClient(); - this.keyValueClient = consul.keyValueClient(); - this.suspendedRoutes = new HashSet<>(); - this.leader = new AtomicBoolean(false); - this.lock = new Object(); - this.index = new AtomicReference<>(BigInteger.valueOf(0)); - this.serviceName = null; - this.servicePath = null; - this.ttl = 60; - this.lockDelay = 10; - this.executorService = null; - this.shouldStopConsumer = true; - this.sessionId = null; + public ConsulRoutePolicy(String consulUrl) { + this.consulUrl = consulUrl; + } + + public ConsulRoutePolicy(ConsulConfiguration configuration) throws Exception { + this.consulUrl = configuration.getUrl(); + this.consul = configuration.createConsulClient(); } @Override @@ -96,6 +92,14 @@ public class ConsulRoutePolicy extends RoutePolicySupport implements CamelContex this.camelContext = camelContext; } + public String getConsulUrl() { + return consulUrl; + } + + public void setConsulUrl(String consulUrl) { + this.consulUrl = consulUrl; + } + @Override public void onInit(Route route) { super.onInit(route); @@ -125,6 +129,26 @@ public class ConsulRoutePolicy extends RoutePolicySupport implements CamelContex @Override protected void doStart() throws Exception { + ObjectHelper.notNull(camelContext, "camelContext"); + ObjectHelper.notNull(serviceName, "serviceName"); + ObjectHelper.notNull(servicePath, "servicePath"); + + if (consul == null) { + Consul.Builder builder = Consul.builder(); + if (consulUrl != null) { + builder.withUrl(consulUrl); + } + + consul = builder.build(); + } + + if (sessionClient == null) { + sessionClient = consul.sessionClient(); + } + if (keyValueClient == null) { + keyValueClient = consul.keyValueClient(); + } + if (sessionId == null) { sessionId = sessionClient.createSession( ImmutableSession.builder() @@ -136,7 +160,7 @@ public class ConsulRoutePolicy extends RoutePolicySupport implements CamelContex LOGGER.debug("SessionID = {}", sessionId); if (executorService == null) { - executorService = getCamelContext().getExecutorServiceManager().newSingleThreadExecutor(this, "HazelcastRoutePolicy"); + executorService = getCamelContext().getExecutorServiceManager().newSingleThreadExecutor(this, "ConsulRoutePolicy"); } setLeader(keyValueClient.acquireLock(servicePath, sessionId)); @@ -154,10 +178,10 @@ public class ConsulRoutePolicy extends RoutePolicySupport implements CamelContex if (sessionId != null) { sessionClient.destroySession(sessionId); sessionId = null; + } - if (executorService != null) { - getCamelContext().getExecutorServiceManager().shutdownGraceful(executorService); - } + if (executorService != null) { + getCamelContext().getExecutorServiceManager().shutdownGraceful(executorService); } } @@ -293,10 +317,10 @@ public class ConsulRoutePolicy extends RoutePolicySupport implements CamelContex @Override public void onComplete(ConsulResponse<Optional<Value>> consulResponse) { if (isRunAllowed()) { - Value response = consulResponse.getResponse().orNull(); - if (response != null) { - String sid = response.getSession().orNull(); - if (ObjectHelper.isEmpty(sid)) { + Optional<Value> value = consulResponse.getResponse(); + if (value.isPresent()) { + Optional<String> sid = value.get().getSession(); + if (sid.isPresent() && ObjectHelper.isNotEmpty(sid.get())) { // If the key is not held by any session, try acquire a // lock (become leader) LOGGER.debug("Try to take leadership ..."); @@ -326,7 +350,8 @@ public class ConsulRoutePolicy extends RoutePolicySupport implements CamelContex keyValueClient.getValue( servicePath, QueryOptions.blockSeconds(ttl / 3, index.get()).build(), - this); + this + ); } } } http://git-wip-us.apache.org/repos/asf/camel/blob/0a487f8a/components/camel-consul/src/test/java/org/apache/camel/component/consul/policy/ConsulRoutePolicyMain.java ---------------------------------------------------------------------- diff --git a/components/camel-consul/src/test/java/org/apache/camel/component/consul/policy/ConsulRoutePolicyMain.java b/components/camel-consul/src/test/java/org/apache/camel/component/consul/policy/ConsulRoutePolicyMain.java index 447f5b3..b93181b 100644 --- a/components/camel-consul/src/test/java/org/apache/camel/component/consul/policy/ConsulRoutePolicyMain.java +++ b/components/camel-consul/src/test/java/org/apache/camel/component/consul/policy/ConsulRoutePolicyMain.java @@ -17,6 +17,7 @@ package org.apache.camel.component.consul.policy; import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.consul.ConsulConstants; import org.apache.camel.main.Main; public final class ConsulRoutePolicyMain { @@ -29,15 +30,16 @@ public final class ConsulRoutePolicyMain { main.addRouteBuilder(new RouteBuilder() { public void configure() { ConsulRoutePolicy policy = new ConsulRoutePolicy(); + policy.setConsulUrl(ConsulConstants.CONSUL_DEFAULT_URL); policy.setServiceName(args[0]); policy.setTtl(15); - fromF("file:///tmp/camel?delete=true") + from("file:///tmp/camel?delete=true") .routeId(args[1]) .routePolicy(policy) .setHeader("ConsulRouteID", simple("${routeId}")) .setHeader("ConsulServiceName", constant(args[0])) - .to("log:org.apache.camel.component.etcd?level=INFO&showAll=true"); + .to("log:org.apache.camel.component.consul?level=INFO&showAll=true"); } });