ignite-1203 - Zookeeper IP finder. Closes #16.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/d69e7785 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/d69e7785 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/d69e7785 Branch: refs/heads/ignite-gg-9615-1 Commit: d69e7785afe15fe99599e992f37c742b8fa57ba8 Parents: 452af6a Author: Raul Kripalani <ra...@apache.org> Authored: Thu Aug 20 18:09:27 2015 -0700 Committer: Valentin Kulichenko <valentin.kuliche...@gmail.com> Committed: Thu Aug 20 18:09:27 2015 -0700 ---------------------------------------------------------------------- modules/zookeeper/pom.xml | 89 +++++ .../zk/TcpDiscoveryZookeeperIpFinder.java | 350 +++++++++++++++++ .../tcp/ipfinder/zk/ZookeeperIpFinderTest.java | 390 +++++++++++++++++++ pom.xml | 1 + 4 files changed, 830 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d69e7785/modules/zookeeper/pom.xml ---------------------------------------------------------------------- diff --git a/modules/zookeeper/pom.xml b/modules/zookeeper/pom.xml new file mode 100644 index 0000000..bf0c578 --- /dev/null +++ b/modules/zookeeper/pom.xml @@ -0,0 +1,89 @@ +<?xml version="1.0" encoding="UTF-8"?> + +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<!-- + POM file. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-parent</artifactId> + <version>1</version> + <relativePath>../../parent</relativePath> + </parent> + + <artifactId>ignite-zookeeper</artifactId> + <version>1.4.1-SNAPSHOT</version> + + <properties> + <curator.version>2.8.0</curator.version> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-core</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-framework</artifactId> + <version>${curator.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-x-discovery</artifactId> + <version>${curator.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-test</artifactId> + <version>${curator.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-log4j</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-spring</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-core</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + </dependencies> + +</project> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d69e7785/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/TcpDiscoveryZookeeperIpFinder.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/TcpDiscoveryZookeeperIpFinder.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/TcpDiscoveryZookeeperIpFinder.java new file mode 100644 index 0000000..dd1a4a7 --- /dev/null +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/TcpDiscoveryZookeeperIpFinder.java @@ -0,0 +1,350 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp.ipfinder.zk; + +import org.apache.curator.*; +import org.apache.curator.framework.*; +import org.apache.curator.framework.imps.*; +import org.apache.curator.retry.*; +import org.apache.curator.x.discovery.*; +import org.apache.curator.x.discovery.details.*; +import org.apache.ignite.*; +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.spi.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; + +import com.google.common.collect.*; +import org.codehaus.jackson.map.annotate.*; + +import java.net.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +/** + * This TCP Discovery IP Finder uses Apache ZooKeeper (ZK) to locate peer nodes when bootstrapping in order to join + * the cluster. It uses the Apache Curator library to interact with ZooKeeper in a simple manner. Specifically, + * it uses the {@link ServiceDiscovery} recipe, which makes use of ephemeral nodes in ZK to register services. + * + * <p> + * There are several ways to instantiate the TcpDiscoveryZookeeperIpFinder: + * <li> + * <ul>By providing an instance of {@link CuratorFramework} directly, in which case no ZK Connection String + * is required.</ul> + * <ul>By providing a ZK Connection String through {@link #setZkConnectionString(String)}, and optionally + * a {@link RetryPolicy} through the setter. If the latter is not provided, a default + * {@link ExponentialBackoffRetry} policy is used, with a base sleep time of 1000ms and 10 retries.</ul> + * <ul>By providing a ZK Connection String through system property {@link #PROP_ZK_CONNECTION_STRING}. If this + * property is set, it overrides the ZK Connection String passed in as a property, but it does not override + * the {@link CuratorFramework} if provided.</ul> + * </li> + * + * You may customise the base path for services, as well as the service name. By default {@link #BASE_PATH} and + * {@link #SERVICE_NAME} are use respectively. You can also choose to enable or disable duplicate registrations. See + * {@link #setAllowDuplicateRegistrations(boolean)} for more details. + * + * @see <a href="http://zookeeper.apache.org">Apache ZooKeeper</a> + * @see <a href="http://curator.apache.org">Apache Curator</a> + * + * @author Raul Kripalani + */ +public class TcpDiscoveryZookeeperIpFinder extends TcpDiscoveryIpFinderAdapter { + + /** System property name to provide the ZK Connection String. */ + public static final String PROP_ZK_CONNECTION_STRING = "IGNITE_ZK_CONNECTION_STRING"; + + /** Default base path for service registrations. */ + private static final String BASE_PATH = "/services"; + + /** Default service name for service registrations. */ + private static final String SERVICE_NAME = "ignite"; + + /** Default URI Spec to use with the {@link ServiceDiscoveryBuilder}. */ + private static final UriSpec URI_SPEC = new UriSpec("{address}:{port}"); + + /** Init guard. */ + @GridToStringExclude + private final AtomicBoolean initGuard = new AtomicBoolean(); + + /** Logger. */ + @LoggerResource + private IgniteLogger log; + + /** The Curator framework in use, either injected or constructed by this component. */ + private CuratorFramework curator; + + /** The ZK Connection String if provided by the user. */ + private String zkConnectionString; + + /** Retry policy to use. */ + private RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10); + + /** Base path to use, by default {#link #BASE_PATH}. */ + private String basePath = BASE_PATH; + + /** Service name to use, by default {#link #SERVICE_NAME}. */ + private String serviceName = SERVICE_NAME; + + /** Whether to allow or not duplicate registrations. See setter doc. */ + private boolean allowDuplicateRegistrations = false; + + /** The Service Discovery recipe. */ + private ServiceDiscovery<IgniteInstanceDetails> discovery; + + /** Map of the {#link ServiceInstance}s we have registered. */ + private Map<InetSocketAddress, ServiceInstance<IgniteInstanceDetails>> ourInstances = new ConcurrentHashMap<>(); + + /** Constructor. */ + public TcpDiscoveryZookeeperIpFinder() { + setShared(true); + } + + /** Initializes this IP Finder by creating the appropriate Curator objects. */ + private void init() { + if (!initGuard.compareAndSet(false, true)) + return; + + String sysPropZkConnString = System.getProperty(PROP_ZK_CONNECTION_STRING); + + if (sysPropZkConnString != null && sysPropZkConnString.trim().length() > 0) + zkConnectionString = sysPropZkConnString; + + log.info("Initializing ZooKeeper IP Finder."); + + if (curator == null) { + A.notNullOrEmpty(zkConnectionString, String.format("ZooKeeper URL (or system property %s) cannot be null " + + "or empty if a CuratorFramework object is not provided explicitly", PROP_ZK_CONNECTION_STRING)); + curator = CuratorFrameworkFactory.newClient(zkConnectionString, retryPolicy); + } + + if (curator.getState() != CuratorFrameworkState.STARTED) + curator.start(); + + discovery = ServiceDiscoveryBuilder.builder(IgniteInstanceDetails.class) + .client(curator) + .basePath(basePath) + .serializer(new JsonInstanceSerializer<>(IgniteInstanceDetails.class)) + .build(); + } + + /** {@inheritDoc} */ + @Override public void onSpiContextDestroyed() { + if (!initGuard.compareAndSet(true, false)) + return; + + log.info("Destroying ZooKeeper IP Finder."); + + super.onSpiContextDestroyed(); + + if (curator != null) + curator.close(); + + } + + /** {@inheritDoc} */ + @Override public Collection<InetSocketAddress> getRegisteredAddresses() throws IgniteSpiException { + init(); + + if (log.isDebugEnabled()) + log.debug("Getting registered addresses from ZooKeeper IP Finder."); + + Collection<ServiceInstance<IgniteInstanceDetails>> serviceInstances; + + try { + serviceInstances = discovery.queryForInstances(serviceName); + } catch (Exception e) { + log.warning("Error while getting registered addresses from ZooKeeper IP Finder.", e); + return Collections.emptyList(); + } + + Set<InetSocketAddress> answer = new HashSet<>(); + + for (ServiceInstance<IgniteInstanceDetails> si : serviceInstances) + answer.add(new InetSocketAddress(si.getAddress(), si.getPort())); + + log.info("ZooKeeper IP Finder resolved addresses: " + answer); + + return answer; + } + + /** {@inheritDoc} */ + @Override public void registerAddresses(Collection<InetSocketAddress> addrs) throws IgniteSpiException { + init(); + + log.info("Registering addresses with ZooKeeper IP Finder: " + addrs); + + Set<InetSocketAddress> registrationsToIgnore = Sets.newHashSet(); + if (!allowDuplicateRegistrations) { + try { + for (ServiceInstance<IgniteInstanceDetails> sd : discovery.queryForInstances(serviceName)) + registrationsToIgnore.add(new InetSocketAddress(sd.getAddress(), sd.getPort())); + } + catch (Exception e) { + log.warning("Error while finding currently registered services to avoid duplicate registrations", e); + throw new IgniteSpiException(e); + } + } + + for (InetSocketAddress addr : addrs) { + if (registrationsToIgnore.contains(addr)) + continue; + + try { + ServiceInstance<IgniteInstanceDetails> si = ServiceInstance.<IgniteInstanceDetails>builder() + .name(serviceName) + .uriSpec(URI_SPEC) + .address(addr.getAddress().getHostAddress()) + .port(addr.getPort()) + .build(); + + ourInstances.put(addr, si); + + discovery.registerService(si); + + } catch (Exception e) { + log.warning(String.format("Error while registering an address from ZooKeeper IP Finder " + + "[message=%s,addresses=%s]", e.getMessage(), addr), e); + } + } + } + + /** {@inheritDoc} */ + @Override public void unregisterAddresses(Collection<InetSocketAddress> addrs) throws IgniteSpiException { + + // if curator is not STARTED, we have nothing to unregister, because we are using ephemeral nodes, + // which means that our addresses will only be registered in ZK as long as our connection is alive + if (curator.getState() != CuratorFrameworkState.STARTED) + return; + + log.info("Unregistering addresses with ZooKeeper IP Finder: " + addrs); + + for (InetSocketAddress addr : addrs) { + ServiceInstance<IgniteInstanceDetails> si = ourInstances.get(addr); + if (si == null) { + log.warning("Asked to unregister address from ZooKeeper IP Finder, but no match was found in local " + + "instance map for: " + addrs); + continue; + } + + try { + discovery.unregisterService(si); + } catch (Exception e) { + log.warning("Error while unregistering an address from ZooKeeper IP Finder: " + addr, e); + } + } + } + + /** + * @param curator A {@link CuratorFramework} instance to use. It can already be in <tt>STARTED</tt> state. + */ + public void setCurator(CuratorFramework curator) { + this.curator = curator; + } + + /** + * @return The ZooKeeper connection string, only if set explicitly. Else, it returns null. + */ + public String getZkConnectionString() { + return zkConnectionString; + } + + /** + * @param zkConnectionString ZooKeeper connection string in case a {@link CuratorFramework} is not being set explicitly. + */ + public void setZkConnectionString(String zkConnectionString) { + this.zkConnectionString = zkConnectionString; + } + + /** + * @return Retry policy in use if, and only if, it was set explicitly. Else, it returns null. + */ + public RetryPolicy getRetryPolicy() { + return retryPolicy; + } + + /** + * @param retryPolicy {@link RetryPolicy} to use in case a ZK Connection String is being injected, or if + * using a system property. + */ + public void setRetryPolicy(RetryPolicy retryPolicy) { + this.retryPolicy = retryPolicy; + } + + /** + * @return Base path for service registration in ZK. Default value: {@link #BASE_PATH}. + */ + public String getBasePath() { + return basePath; + } + + /** + * @param basePath Base path for service registration in ZK. If not passed, {@link #BASE_PATH} will be used. + */ + public void setBasePath(String basePath) { + this.basePath = basePath; + } + + /** + * @return Service name being used, in Curator terms. See {@link #setServiceName(String)} for more information. + */ + public String getServiceName() { + return serviceName; + } + + /** + * @param serviceName Service name to use, as defined by Curator's {#link ServiceDiscovery} recipe. In physical + * ZK terms, it represents the node under {@link #basePath}, under which services will be + * registered. + */ + public void setServiceName(String serviceName) { + this.serviceName = serviceName; + } + + /** + * * @return The value of this flag. See {@link #setAllowDuplicateRegistrations(boolean)} for more details. + */ + public boolean isAllowDuplicateRegistrations() { + return allowDuplicateRegistrations; + } + + /** + * @param allowDuplicateRegistrations Whether to register each node only once, or if duplicate registrations + * are allowed. Nodes will attempt to register themselves, plus those they + * know about. By default, duplicate registrations are not allowed, but you + * might want to set this property to <tt>true</tt> if you have multiple + * network interfaces or if you are facing troubles. + */ + public void setAllowDuplicateRegistrations(boolean allowDuplicateRegistrations) { + this.allowDuplicateRegistrations = allowDuplicateRegistrations; + } + + /** + * Empty DTO for storing service instances details. Currently acting as a placeholder because Curator requires + * a payload type when registering and discovering nodes. May be enhanced in the future with further information + * to assist discovery. + * + * @author Raul Kripalani + */ + @JsonRootName("ignite_instance_details") + private class IgniteInstanceDetails { + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d69e7785/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZookeeperIpFinderTest.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZookeeperIpFinderTest.java b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZookeeperIpFinderTest.java new file mode 100644 index 0000000..ce059a3 --- /dev/null +++ b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZookeeperIpFinderTest.java @@ -0,0 +1,390 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp.ipfinder.zk; + +import org.apache.curator.framework.*; +import org.apache.curator.retry.*; +import org.apache.curator.test.*; +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.events.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.*; +import java.util.concurrent.*; + +/** + * Test for {@link TcpDiscoveryZookeeperIpFinder}. + * + * @author Raul Kripalani + */ +public class ZookeeperIpFinderTest extends GridCommonAbstractTest { + + /** ZK Cluster size. */ + private static final int ZK_CLUSTER_SIZE = 3; + + /** ZK Path size. */ + private static final String SERVICES_IGNITE_ZK_PATH = "/services/ignite"; + + /** The ZK cluster instance, from curator-test. */ + private TestingCluster zkCluster; + + /** A Curator client to perform assertions on the embedded ZK instances. */ + private CuratorFramework zkCurator; + + /** Whether to allow duplicate registrations for the current test method or not. */ + private boolean allowDuplicateRegistrations = false; + + /** Constructor that does not start any grids. */ + public ZookeeperIpFinderTest() { + super(false); + } + + /** + * Before test. + * @throws Exception + */ + @Override public void beforeTest() throws Exception { + super.beforeTest(); + + // remove stale system properties + System.getProperties().remove(TcpDiscoveryZookeeperIpFinder.PROP_ZK_CONNECTION_STRING); + + // start the ZK cluster + zkCluster = new TestingCluster(ZK_CLUSTER_SIZE); + zkCluster.start(); + + // start the Curator client so we can perform assertions on the ZK state later + zkCurator = CuratorFrameworkFactory.newClient(zkCluster.getConnectString(), new RetryNTimes(10, 1000)); + zkCurator.start(); + + } + + /** + * After test. + * @throws Exception + */ + @Override public void afterTest() throws Exception { + super.afterTest(); + + if (zkCurator != null) + zkCurator.close(); + + if (zkCluster != null) { + zkCluster.stop(); + zkCluster.close(); + } + + stopAllGrids(); + + } + + /** + * Enhances the default configuration with the {#TcpDiscoveryZookeeperIpFinder}. + * + * @param gridName Grid name. + * @return + * @throws Exception + */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration configuration = super.getConfiguration(gridName); + + TcpDiscoverySpi tcpDisco = (TcpDiscoverySpi) configuration.getDiscoverySpi(); + TcpDiscoveryZookeeperIpFinder zkIpFinder = new TcpDiscoveryZookeeperIpFinder(); + zkIpFinder.setAllowDuplicateRegistrations(isAllowDuplicateRegistrations()); + + // first node => configure with zkUrl; second node => configure with CuratorFramework; third and subsequent + // shall be configured through system property + if (gridName.equals(getTestGridName(0))) { + zkIpFinder.setZkConnectionString(zkCluster.getConnectString()); + } + else if (gridName.equals(getTestGridName(1))) { + zkIpFinder.setCurator(CuratorFrameworkFactory.newClient(zkCluster.getConnectString(), + new ExponentialBackoffRetry(100, 5))); + } + + tcpDisco.setIpFinder(zkIpFinder); + return configuration; + } + + /** + * @throws Exception + */ + public void testOneIgniteNodeIsAlone() throws Exception { + startGrid(0); + + assertEquals(1, grid(0).cluster().metrics().getTotalNodes()); + + stopAllGrids(); + } + + /** + * @throws Exception + */ + public void testTwoIgniteNodesFindEachOther() throws Exception { + // start one node + startGrid(0); + + // set up an event listener to expect one NODE_JOINED event + CountDownLatch latch = expectJoinEvents(grid(0), 1); + + // start the other node + startGrid(1); + + // assert the nodes see each other + assertEquals(2, grid(0).cluster().metrics().getTotalNodes()); + assertEquals(2, grid(1).cluster().metrics().getTotalNodes()); + + // assert the event listener got as many events as expected + latch.await(1, TimeUnit.SECONDS); + + stopAllGrids(); + } + + /** + * @throws Exception + */ + public void testThreeNodesWithThreeDifferentConfigMethods() throws Exception { + // start one node + startGrid(0); + + // set up an event listener to expect one NODE_JOINED event + CountDownLatch latch = expectJoinEvents(grid(0), 2); + + // start the 2nd node + startGrid(1); + + // start the 3rd node, first setting the system property + System.setProperty(TcpDiscoveryZookeeperIpFinder.PROP_ZK_CONNECTION_STRING, zkCluster.getConnectString()); + startGrid(2); + + // wait until all grids are started + waitForRemoteNodes(grid(0), 2); + + // assert the nodes see each other + assertEquals(3, grid(0).cluster().metrics().getTotalNodes()); + assertEquals(3, grid(1).cluster().metrics().getTotalNodes()); + assertEquals(3, grid(2).cluster().metrics().getTotalNodes()); + + // assert the event listener got as many events as expected + latch.await(1, TimeUnit.SECONDS); + + stopAllGrids(); + } + + /** + * @throws Exception + */ + public void testFourNodesStartingAndStopping() throws Exception { + // start one node + startGrid(0); + + // set up an event listener to expect one NODE_JOINED event + CountDownLatch latch = expectJoinEvents(grid(0), 3); + + // start the 2nd node + startGrid(1); + + // start the 3rd & 4th nodes, first setting the system property + System.setProperty(TcpDiscoveryZookeeperIpFinder.PROP_ZK_CONNECTION_STRING, zkCluster.getConnectString()); + startGrid(2); + startGrid(3); + + // wait until all grids are started + waitForRemoteNodes(grid(0), 3); + + // assert the nodes see each other + assertEquals(4, grid(0).cluster().metrics().getTotalNodes()); + assertEquals(4, grid(1).cluster().metrics().getTotalNodes()); + assertEquals(4, grid(2).cluster().metrics().getTotalNodes()); + assertEquals(4, grid(3).cluster().metrics().getTotalNodes()); + + // assert the event listener got as many events as expected + latch.await(1, TimeUnit.SECONDS); + + // stop the first grid + stopGrid(0); + + // make sure that nodes were synchronized; they should only see 3 now + assertEquals(3, grid(1).cluster().metrics().getTotalNodes()); + assertEquals(3, grid(2).cluster().metrics().getTotalNodes()); + assertEquals(3, grid(3).cluster().metrics().getTotalNodes()); + + // stop all remaining grids + stopGrid(1); + stopGrid(2); + stopGrid(3); + + // check that the nodes are gone in ZK + assertEquals(0, zkCurator.getChildren().forPath(SERVICES_IGNITE_ZK_PATH).size()); + } + + /** + * @throws Exception + */ + public void testFourNodesWithDuplicateRegistrations() throws Exception { + setAllowDuplicateRegistrations(true); + + // start 4 nodes + System.setProperty(TcpDiscoveryZookeeperIpFinder.PROP_ZK_CONNECTION_STRING, zkCluster.getConnectString()); + startGrids(4); + + // wait until all grids are started + waitForRemoteNodes(grid(0), 3); + + // each node will register itself + the node that it connected to to join the cluster + assertEquals(7, zkCurator.getChildren().forPath(SERVICES_IGNITE_ZK_PATH).size()); + + // stop all grids + stopAllGrids(); + + // check that all nodes are gone in ZK + assertEquals(0, zkCurator.getChildren().forPath(SERVICES_IGNITE_ZK_PATH).size()); + } + + /** + * @throws Exception + */ + public void testFourNodesWithNoDuplicateRegistrations() throws Exception { + setAllowDuplicateRegistrations(false); + + // start 4 nodes + System.setProperty(TcpDiscoveryZookeeperIpFinder.PROP_ZK_CONNECTION_STRING, zkCluster.getConnectString()); + startGrids(4); + + // wait until all grids are started + waitForRemoteNodes(grid(0), 3); + + // each node will only register itself + assertEquals(4, zkCurator.getChildren().forPath(SERVICES_IGNITE_ZK_PATH).size()); + + // stop all grids + stopAllGrids(); + + // check that all nodes are gone in ZK + assertEquals(0, zkCurator.getChildren().forPath(SERVICES_IGNITE_ZK_PATH).size()); + } + + /** + * @throws Exception + */ + public void testFourNodesRestartLastSeveralTimes() throws Exception { + setAllowDuplicateRegistrations(false); + + // start 4 nodes + System.setProperty(TcpDiscoveryZookeeperIpFinder.PROP_ZK_CONNECTION_STRING, zkCluster.getConnectString()); + startGrids(4); + + // wait until all grids are started + waitForRemoteNodes(grid(0), 3); + + // each node will only register itself + assertEquals(4, zkCurator.getChildren().forPath(SERVICES_IGNITE_ZK_PATH).size()); + + // repeat 5 times + for (int i = 0; i < 5; i++) { + // stop last grid + stopGrid(2); + + // check that the node has unregistered itself and its party + assertEquals(3, zkCurator.getChildren().forPath(SERVICES_IGNITE_ZK_PATH).size()); + + // start the node again + startGrid(2); + + // check that the node back in ZK + assertEquals(4, zkCurator.getChildren().forPath(SERVICES_IGNITE_ZK_PATH).size()); + } + + stopAllGrids(); + + assertEquals(0, zkCurator.getChildren().forPath(SERVICES_IGNITE_ZK_PATH).size()); + + } + + /** + * @throws Exception + */ + public void testFourNodesKillRestartZookeeper() throws Exception { + setAllowDuplicateRegistrations(false); + + // start 4 nodes + System.setProperty(TcpDiscoveryZookeeperIpFinder.PROP_ZK_CONNECTION_STRING, zkCluster.getConnectString()); + startGrids(4); + + // wait until all grids are started + waitForRemoteNodes(grid(0), 3); + + // each node will only register itself + assertEquals(4, zkCurator.getChildren().forPath(SERVICES_IGNITE_ZK_PATH).size()); + + // remember ZK server configuration and stop the cluster + Collection<InstanceSpec> instances = zkCluster.getInstances(); + zkCluster.stop(); + Thread.sleep(1000); + + // start the cluster with the previous configuration + zkCluster = new TestingCluster(instances); + zkCluster.start(); + + // block the client until connected + zkCurator.blockUntilConnected(); + + // check that the nodes have registered again + assertEquals(4, zkCurator.getChildren().forPath(SERVICES_IGNITE_ZK_PATH).size()); + + // stop all grids + stopAllGrids(); + Thread.sleep(2000); + + // check that all nodes are gone in ZK + assertEquals(0, zkCurator.getChildren().forPath(SERVICES_IGNITE_ZK_PATH).size()); + } + + /** + * @throws Exception + */ + private CountDownLatch expectJoinEvents(Ignite ignite, int joinEventCount) { + final CountDownLatch latch = new CountDownLatch(joinEventCount); + + ignite.events().remoteListen(new IgniteBiPredicate<UUID, Event>() { + @Override public boolean apply(UUID uuid, Event event) { + latch.countDown(); + return true; + } + }, null, EventType.EVT_NODE_JOINED); + + return latch; + } + + /** + * @throws Exception + */ + public void setAllowDuplicateRegistrations(boolean allowDuplicateRegistrations) { + this.allowDuplicateRegistrations = allowDuplicateRegistrations; + } + + /** + * @throws Exception + */ + public boolean isAllowDuplicateRegistrations() { + return allowDuplicateRegistrations; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d69e7785/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index f3a5d65..fa3eaa4 100644 --- a/pom.xml +++ b/pom.xml @@ -75,6 +75,7 @@ <module>modules/kafka</module> <module>modules/yarn</module> <module>modules/jms11</module> + <module>modules/zookeeper</module> </modules> <profiles>