Author: ningjiang Date: Mon Sep 3 08:58:31 2012 New Revision: 1380174 URL: http://svn.apache.org/viewvc?rev=1380174&view=rev Log: CAMEL-5546 Applied the patch with thanks to Andrew
Added: camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/ElectionWatcher.java camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/ZooKeeperElection.java camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/policy/ZookeeperElectionTest.java Modified: camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/ZooKeeperRoutePolicy.java camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZooKeeperTestSupport.java camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/policy/FailoverRoutePolicyTest.java Added: camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/ElectionWatcher.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/ElectionWatcher.java?rev=1380174&view=auto ============================================================================== --- camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/ElectionWatcher.java (added) +++ camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/ElectionWatcher.java Mon Sep 3 08:58:31 2012 @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.zookeeper.policy; + + +public interface ElectionWatcher { + /** + * This method is called when there is a potential change to the master. + * Implementations should call "isMaster" on their ZookeeperElection + * instance to re-validate their status. + */ + void electionResultChanged(); +} Added: camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/ZooKeeperElection.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/ZooKeeperElection.java?rev=1380174&view=auto ============================================================================== --- camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/ZooKeeperElection.java (added) +++ camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/ZooKeeperElection.java Mon Sep 3 08:58:31 2012 @@ -0,0 +1,252 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.zookeeper.policy; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + + +import org.apache.camel.CamelContext; +import org.apache.camel.Exchange; +import org.apache.camel.ExchangePattern; +import org.apache.camel.Processor; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.zookeeper.SequenceComparator; +import org.apache.camel.component.zookeeper.ZooKeeperEndpoint; +import org.apache.camel.component.zookeeper.ZooKeeperMessage; +import org.apache.camel.impl.JavaUuidGenerator; +import org.apache.camel.spi.UuidGenerator; + +import org.apache.zookeeper.CreateMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * <code>ZooKeeperElection</code> uses the leader election capabilities of a + * ZooKeeper cluster to control which nodes are enabled. It is typically used in + * fail-over scenarios controlling identical instances of an application across + * a cluster of Camel based servers. <p> The election is configured with a 'top + * n' number of servers that should be marked as master, for a simple + * master/slave scenario this would be 1. Each instance will execute the + * election algorithm to obtain its position in the hierarchy of servers, if it + * is within the 'top n' servers then the node is enabled and isMaster() will + * return 'true'. If not it waits for a change in the leader hierarchy and then + * reruns this scenario to see if it is now in the top n. <p> All instances of + * the election must also be configured with the same path on the ZooKeeper + * cluster where the election will be carried out. It is good practice for this + * to indicate the application e.g. <tt>/someapplication/someroute/</tt> note + * that these nodes should exist before using the election. <p> See <a + * href="http://hadoop.apache.org/zookeeper/docs/current/recipes.html#sc_leaderElection"> + * for more on how Leader election</a> is archived with ZooKeeper. + */ +public class ZooKeeperElection { + + private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperElection.class); + private final ProducerTemplate producerTemplate; + private final CamelContext camelContext; + private final String uri; + private final String candidateName; + private final Lock lock = new ReentrantLock(); + private final CountDownLatch electionComplete = new CountDownLatch(1); + private AtomicBoolean masterNode = new AtomicBoolean(); + private volatile boolean isCandidateCreated; + private int enabledCount = 1; + private UuidGenerator uuidGenerator = new JavaUuidGenerator(); + private final List<ElectionWatcher> watchers = new ArrayList<ElectionWatcher>(); + + public ZooKeeperElection(CamelContext camelContext, String uri, int enabledCount) { + this(camelContext.createProducerTemplate(), camelContext, uri, enabledCount); + } + + public ZooKeeperElection(ProducerTemplate producerTemplate, CamelContext camelContext, String uri, int enabledCount) { + this.camelContext = camelContext; + this.producerTemplate = producerTemplate; + this.uri = uri; + this.enabledCount = enabledCount; + this.candidateName = createCandidateName(); + } + + public boolean isMaster() { + if (!isCandidateCreated) { + testAndCreateCandidateNode(); + awaitElectionResults(); + + } + return masterNode.get(); + } + + private String createCandidateName() { + StringBuilder builder = new StringBuilder(); + try { + /* UUID would be enough, also using hostname for human readability */ + builder.append(InetAddress.getLocalHost().getCanonicalHostName()); + } catch (UnknownHostException ex) { + LOG.warn("Failed to get the local hostname.", ex); + builder.append("unknown-host"); + } + builder.append("-").append(uuidGenerator.generateUuid()); + return builder.toString(); + } + + private void testAndCreateCandidateNode() { + try { + lock.lock(); + if (!isCandidateCreated) { + createCandidateNode(camelContext); + isCandidateCreated = true; + } + } catch (Exception e) { + handleException(e); + } finally { + lock.unlock(); + } + } + + private void awaitElectionResults() { + while (electionComplete.getCount() > 0) { + try { + LOG.debug("Awaiting election results..."); + electionComplete.await(); + } catch (InterruptedException e1) { + } + } + } + + private ZooKeeperEndpoint createCandidateNode(CamelContext camelContext) { + LOG.info("Initializing ZookeeperElection with uri '{}'", uri); + ZooKeeperEndpoint zep = camelContext.getEndpoint(uri, ZooKeeperEndpoint.class); + zep.getConfiguration().setCreate(true); + String fullpath = createFullPathToCandidate(zep); + Exchange e = zep.createExchange(); + e.setPattern(ExchangePattern.InOut); + e.getIn().setHeader(ZooKeeperMessage.ZOOKEEPER_NODE, fullpath); + e.getIn().setHeader(ZooKeeperMessage.ZOOKEEPER_CREATE_MODE, CreateMode.EPHEMERAL_SEQUENTIAL); + producerTemplate.send(zep, e); + + if (e.isFailed()) { + LOG.error("Error setting up election node " + fullpath, e.getException()); + } else { + LOG.info("Candidate node '{}' has been created", fullpath); + try { + if (zep != null) { + camelContext.addRoutes(new ElectoralMonitorRoute(zep)); + } + } catch (Exception ex) { + LOG.error("Error configuring ZookeeperElection", ex); + } + } + return zep; + + } + + private String createFullPathToCandidate(ZooKeeperEndpoint zep) { + String fullpath = zep.getConfiguration().getPath(); + if (!fullpath.endsWith("/")) { + fullpath += "/"; + } + fullpath += candidateName; + return fullpath; + } + + private void handleException(Exception e) { + throw new RuntimeException(e.getMessage(), e); + } + + private void notifyElectionWatchers() { + for (ElectionWatcher watcher : watchers) { + try { + watcher.electionResultChanged(); + } catch (Exception e) { + LOG.warn("Election watcher " + watcher + " of type " + watcher.getClass() + " threw an exception.", e); + } + } + } + + public boolean addElectionWatcher(ElectionWatcher e) { + return watchers.add(e); + } + + public boolean removeElectionWatcher(ElectionWatcher o) { + return watchers.remove(o); + } + + private class ElectoralMonitorRoute extends RouteBuilder { + + private SequenceComparator comparator = new SequenceComparator(); + private ZooKeeperEndpoint zep; + + public ElectoralMonitorRoute(ZooKeeperEndpoint zep) { + this.zep = zep; + zep.getConfiguration().setListChildren(true); + zep.getConfiguration().setSendEmptyMessageOnDelete(true); + zep.getConfiguration().setRepeat(true); + } + + @Override + public void configure() throws Exception { + + /** + * TODO: this is cheap cheerful but suboptimal; it suffers from the + * 'herd effect' that on any change to the candidates list every + * policy instance will ask for the entire candidate list again. + * This is fine for small numbers of nodes (for scenarios like + * Master-Slave it is perfect) but could get noisy if large numbers + * of nodes were involved. <p> Better would be to find the position + * of this node in the list and watch the node in the position ahead + * node ahead of this and only request the candidate list when its + * status changes. This will require enhancing the consumer to allow + * custom operation lists. + */ + from(zep).id("election-route-" + candidateName.substring(0, 8)).sort(body(), comparator).process(new Processor() { + @Override + public void process(Exchange e) throws Exception { + @SuppressWarnings("unchecked") + List<String> candidates = e.getIn().getMandatoryBody(List.class); + + int location = Math.abs(Collections.binarySearch(candidates, candidateName)); + /** + * check if the item at this location starts with this nodes + * candidate name + */ + if (isOurCandidateAtLocationInCandidatesList(candidates, location)) { + + masterNode.set(location <= enabledCount); + LOG.debug( + "This node is number '{}' on the candidate list, election is configured for the top '{}'. this node will be {}", + new Object[]{location, enabledCount, masterNode.get() ? "enabled" : "disabled"} + ); + } + electionComplete.countDown(); + + notifyElectionWatchers(); + } + + private boolean isOurCandidateAtLocationInCandidatesList(List<String> candidates, int location) { + return location <= candidates.size() && candidates.get(location - 1).startsWith(candidateName); + } + }); + } + } +} Modified: camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/ZooKeeperRoutePolicy.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/ZooKeeperRoutePolicy.java?rev=1380174&r1=1380173&r2=1380174&view=diff ============================================================================== --- camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/ZooKeeperRoutePolicy.java (original) +++ camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/ZooKeeperRoutePolicy.java Mon Sep 3 08:58:31 2012 @@ -16,32 +16,15 @@ */ package org.apache.camel.component.zookeeper.policy; -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.util.Collections; -import java.util.List; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import org.apache.camel.CamelContext; import org.apache.camel.Exchange; -import org.apache.camel.ExchangePattern; -import org.apache.camel.Processor; -import org.apache.camel.ProducerTemplate; import org.apache.camel.Route; -import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.component.zookeeper.SequenceComparator; -import org.apache.camel.component.zookeeper.ZooKeeperEndpoint; -import org.apache.camel.component.zookeeper.ZooKeeperMessage; -import org.apache.camel.impl.JavaUuidGenerator; import org.apache.camel.impl.RoutePolicySupport; -import org.apache.camel.spi.UuidGenerator; - -import org.apache.zookeeper.CreateMode; /** * <code>ZooKeeperRoutePolicy</code> uses the leader election capabilities of a @@ -65,81 +48,57 @@ import org.apache.zookeeper.CreateMode; * See <a href="http://hadoop.apache.org/zookeeper/docs/current/recipes.html#sc_leaderElection"> * for more on how Leader election</a> is archived with ZooKeeper. */ -public class ZooKeeperRoutePolicy extends RoutePolicySupport { +public class ZooKeeperRoutePolicy extends RoutePolicySupport implements ElectionWatcher { private final String uri; private final int enabledCount; - private String candidateName; private final Lock lock = new ReentrantLock(); - private final CountDownLatch electionComplete = new CountDownLatch(1); private final Set<Route> suspendedRoutes = new CopyOnWriteArraySet<Route>(); private final AtomicBoolean shouldProcessExchanges = new AtomicBoolean(); - private ProducerTemplate template; private volatile boolean shouldStopConsumer = true; - private final UuidGenerator uuidGenerator = new JavaUuidGenerator(); - private volatile boolean isCandidateCreated; + + private final Lock electionLock = new ReentrantLock(); + private ZooKeeperElection election; public ZooKeeperRoutePolicy(String uri, int enabledCount) { this.uri = uri; this.enabledCount = enabledCount; - createCandidateName(); } - private void createCandidateName() { - /** UUID would be enough, also using hostname for human readability */ - StringBuilder b = new StringBuilder(fetchHostname()); - b.append("-").append(uuidGenerator.generateUuid()); - this.candidateName = b.toString(); - } - - private String fetchHostname() { - try { - return InetAddress.getLocalHost().getCanonicalHostName(); - } catch (UnknownHostException ex) { - log.warn("Unable to determine the local hostname, using a default.", ex); - return "default"; - } + public ZooKeeperRoutePolicy(ZooKeeperElection election) { + this.election = election; + this.uri = null; + this.enabledCount = -1; } @Override public void onExchangeBegin(Route route, Exchange exchange) { - testAndCreateCandidateNode(route); + ensureElectionIsCreated(route); - awaitElectionResults(); - if (!shouldProcessExchanges.get()) { + if (election.isMaster()) { if (shouldStopConsumer) { - stopConsumer(route); + startConsumer(route); } - - IllegalStateException e = new IllegalStateException("Zookeeper based route policy prohibits processing exchanges, stopping route and failing the exchange"); - exchange.setException(e); - } else { if (shouldStopConsumer) { - startConsumer(route); + stopConsumer(route); } - } - } - private void testAndCreateCandidateNode(Route route) { - try { - lock.lock(); - if (!isCandidateCreated) { - createCandidateNode(route.getRouteContext().getCamelContext()); - isCandidateCreated = true; - } - } catch (Exception e) { - handleException(e); - } finally { - lock.unlock(); + IllegalStateException e = new IllegalStateException("Zookeeper based route policy prohibits processing exchanges, stopping route and failing the exchange"); + exchange.setException(e); } } - private void awaitElectionResults() { - while (electionComplete.getCount() > 0) { + private void ensureElectionIsCreated(Route route) { + if (election == null) { + electionLock.lock(); try { - electionComplete.await(); - } catch (InterruptedException e1) { + if (election == null) { // re-test + election = new ZooKeeperElection(route.getRouteContext().getCamelContext(), uri, enabledCount); + election.addElectionWatcher(this); + } + } finally { + electionLock.unlock(); } } } @@ -173,6 +132,13 @@ public class ZooKeeperRoutePolicy extend } } + @Override + public void electionResultChanged() { + if (election.isMaster()) { + startAllStoppedConsumers(); + } + } + private void startAllStoppedConsumers() { try { lock.lock(); @@ -200,99 +166,4 @@ public class ZooKeeperRoutePolicy extend public void setShouldStopConsumer(boolean shouldStopConsumer) { this.shouldStopConsumer = shouldStopConsumer; } - - private ZooKeeperEndpoint createCandidateNode(CamelContext camelContext) { - this.template = camelContext.createProducerTemplate(); - log.info("Initializing ZookeeperRoutePolicy with uri {}", uri); - - ZooKeeperEndpoint zep = camelContext.getEndpoint(uri, ZooKeeperEndpoint.class); - zep.getConfiguration().setCreate(true); - String fullpath = createFullPathToCandidate(zep); - Exchange e = zep.createExchange(); - e.setPattern(ExchangePattern.InOut); - e.getIn().setHeader(ZooKeeperMessage.ZOOKEEPER_NODE, fullpath); - e.getIn().setHeader(ZooKeeperMessage.ZOOKEEPER_CREATE_MODE, CreateMode.EPHEMERAL_SEQUENTIAL); - template.send(zep, e); - - if (e.isFailed()) { - log.warn("Error setting up election node " + fullpath, e.getException()); - } else { - log.info("Candidate node {} has been created", fullpath); - try { - camelContext.addRoutes(new ElectoralMonitorRoute(zep)); - } catch (Exception ex) { - log.warn("Error configuring ZookeeperRoutePolicy. This exception is ignored.", ex); - } - } - return zep; - - } - - private String createFullPathToCandidate(ZooKeeperEndpoint zep) { - String fullpath = zep.getConfiguration().getPath(); - if (!fullpath.endsWith("/")) { - fullpath += "/"; - } - fullpath += candidateName; - return fullpath; - } - - private class ElectoralMonitorRoute extends RouteBuilder { - - private SequenceComparator comparator = new SequenceComparator(); - - private ZooKeeperEndpoint zep; - - public ElectoralMonitorRoute(ZooKeeperEndpoint zep) { - this.zep = zep; - zep.getConfiguration().setListChildren(true); - zep.getConfiguration().setRepeat(true); - } - - @Override - public void configure() throws Exception { - - /** - * TODO: this is cheap cheerful but suboptimal; it suffers from the - * 'herd effect' that on any change to the candidates list every - * policy instance will ask for the entire candidate list again. - * This is fine for small numbers of nodes (for scenarios - * like Master-Slave it is perfect) but could get noisy if - * large numbers of nodes were involved. - * <p> - * Better would be to find the position of this node in the list and - * watch the node in the position ahead node ahead of this and only - * request the candidate list when its status changes. This will - * require enhancing the consumer to allow custom operation lists. - */ - from(zep).sort(body(), comparator).process(new Processor() { - - public void process(Exchange e) throws Exception { - @SuppressWarnings("unchecked") - List<String> candidates = e.getIn().getMandatoryBody(List.class); - - int location = Math.abs(Collections.binarySearch(candidates, candidateName)); - /** - * check if the item at this location starts with this nodes - * candidate name - */ - if (isOurCandidateAtLocationInCandidatesList(candidates, location)) { - - shouldProcessExchanges.set(location <= enabledCount); - if (log.isDebugEnabled()) { - log.debug("This node is number {} on the candidate list, route is configured for the top {}. Exchange processing will be {}", - new Object[]{location, enabledCount, shouldProcessExchanges.get() ? "enabled" : "disabled"}); - } - startAllStoppedConsumers(); - } - electionComplete.countDown(); - } - - private boolean isOurCandidateAtLocationInCandidatesList(List<String> candidates, int location) { - return location <= candidates.size() && candidates.get(location - 1).startsWith(candidateName); - } - }); - } - } - } Modified: camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZooKeeperTestSupport.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZooKeeperTestSupport.java?rev=1380174&r1=1380173&r2=1380174&view=diff ============================================================================== --- camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZooKeeperTestSupport.java (original) +++ camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZooKeeperTestSupport.java Mon Sep 3 08:58:31 2012 @@ -206,6 +206,15 @@ public class ZooKeeperTestSupport extend } } + public void deleteAll(String node) throws Exception { + delay(200); + log.debug("Deleting {} and it's immediate children", node); + for (String child : zk.getChildren(node, false)) { + delete(node + "/" + child); + } + delete(node); + } + public void delete(String node) throws Exception { delay(200); log.debug("Deleting node " + node); Modified: camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/policy/FailoverRoutePolicyTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/policy/FailoverRoutePolicyTest.java?rev=1380174&r1=1380173&r2=1380174&view=diff ============================================================================== --- camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/policy/FailoverRoutePolicyTest.java (original) +++ camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/policy/FailoverRoutePolicyTest.java Mon Sep 3 08:58:31 2012 @@ -27,8 +27,11 @@ import org.apache.camel.component.zookee import org.apache.camel.impl.DefaultCamelContext; import org.apache.commons.logging.LogFactory; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class FailoverRoutePolicyTest extends ZooKeeperTestSupport { + private static final Logger LOG = LoggerFactory.getLogger(FailoverRoutePolicyTest.class); protected CamelContext createCamelContext() throws Exception { disableJMX(); @@ -76,6 +79,7 @@ public class FailoverRoutePolicyTest ext template.sendBody("vm:" + routename, ExchangePattern.InOut, message); } catch (Exception e) { if (expected > 0) { + LOG.error(e.getMessage(), e); fail("Expected messages..."); } } Added: camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/policy/ZookeeperElectionTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/policy/ZookeeperElectionTest.java?rev=1380174&view=auto ============================================================================== --- camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/policy/ZookeeperElectionTest.java (added) +++ camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/policy/ZookeeperElectionTest.java Mon Sep 3 08:58:31 2012 @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.zookeeper.policy; + +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.camel.component.zookeeper.ZooKeeperTestSupport; +import org.apache.camel.impl.DefaultCamelContext; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ZookeeperElectionTest extends ZooKeeperTestSupport { + private static final Logger LOG = LoggerFactory.getLogger(ZookeeperElectionTest.class); + + private static final String NODE_BASE_KEY = "/someapp"; + private static final String NODE_PARTICULAR_KEY = "/someapp/somepolicy"; + private static final String ELECTION_URI = "zookeeper:localhost:39913/someapp/somepolicy"; + + @Before + public void before() throws Exception { + // set up the parent used to control the election + client.createPersistent(NODE_BASE_KEY, "App node to contain policy election nodes..."); + client.createPersistent(NODE_PARTICULAR_KEY, "Policy node used by route policy to control routes..."); + } + + @After + public void after() throws Exception { + client.deleteAll(NODE_PARTICULAR_KEY); + client.delete(NODE_BASE_KEY); + } + + @Test + public void masterCanBeElected() throws Exception { + ZooKeeperElection candidate = new ZooKeeperElection(template, context, ELECTION_URI, 1); + assertTrue("The only election candidate was not elected as master.", candidate.isMaster()); + } + + @Test + public void masterAndSlave() throws Exception { + final DefaultCamelContext candidateOneContext = createNewContext(); + final DefaultCamelContext candidateTwoContext = createNewContext(); + + ZooKeeperElection electionCandidate1 = createElectionCandidate(candidateOneContext, 1); + assertTrue("The first candidate was not elected.", electionCandidate1.isMaster()); + ZooKeeperElection electionCandidate2 = createElectionCandidate(candidateTwoContext, 1); + assertFalse("The second candidate should not have been elected.", electionCandidate2.isMaster()); + } + + @Test + public void testMasterGoesAway() throws Exception { + final DefaultCamelContext candidateOneContext = createNewContext(); + final DefaultCamelContext candidateTwoContext = createNewContext(); + + ZooKeeperElection electionCandidate1 = createElectionCandidate(candidateOneContext, 1); + assertTrue("The first candidate was not elected.", electionCandidate1.isMaster()); + ZooKeeperElection electionCandidate2 = createElectionCandidate(candidateTwoContext, 1); + assertFalse("The second candidate should not have been elected.", electionCandidate2.isMaster()); + + LOG.debug("About to shutdown the first candidate."); + + candidateOneContext.stop(); // the first candidate was killed. + + delay(3000); // more than the timeout on the zeekeeper server. + assertTrue("The second candidate should have been elected.", electionCandidate2.isMaster()); + } + + @Test + public void testDualMaster() throws Exception { + final DefaultCamelContext candidateOneContext = createNewContext(); + final DefaultCamelContext candidateTwoContext = createNewContext(); + + ZooKeeperElection electionCandidate1 = createElectionCandidate(candidateOneContext, 2); + assertTrue("The first candidate was not elected.", electionCandidate1.isMaster()); + ZooKeeperElection electionCandidate2 = createElectionCandidate(candidateTwoContext, 2); + assertTrue("The second candidate should also be a master.", electionCandidate2.isMaster()); + } + + @Test + public void testWatchersAreNotified() throws Exception { + final DefaultCamelContext candidateOneContext = createNewContext(); + final DefaultCamelContext candidateTwoContext = createNewContext(); + + final AtomicBoolean notified = new AtomicBoolean(false); + ElectionWatcher watcher = new ElectionWatcher() { + @Override public void electionResultChanged() { notified.set(true); } + }; + + ZooKeeperElection electionCandidate1 = createElectionCandidate(candidateOneContext, 2); + assertTrue("The first candidate was not elected.", electionCandidate1.isMaster()); + electionCandidate1.addElectionWatcher(watcher); + ZooKeeperElection electionCandidate2 = createElectionCandidate(candidateTwoContext, 2); + electionCandidate2.isMaster(); + assertTrue("The first candidate should have had it's watcher notified", notified.get()); + } + + private DefaultCamelContext createNewContext() throws Exception { + DefaultCamelContext controlledContext = new DefaultCamelContext(); + controlledContext.start(); + return controlledContext; + } + + private ZooKeeperElection createElectionCandidate(final DefaultCamelContext context, int masterCount) { + return new ZooKeeperElection(context.createProducerTemplate(), context, ELECTION_URI, masterCount); + } +}